Whamcloud - gitweb
- orphan handling code update
authortappro <tappro>
Tue, 26 Sep 2006 23:47:51 +0000 (23:47 +0000)
committertappro <tappro>
Tue, 26 Sep 2006 23:47:51 +0000 (23:47 +0000)
- move txn_callback_{add,del} to the {mount,shutdown} in mdd
- regroup mdd code a bit by functionality

lustre/include/lu_object.h
lustre/mdd/mdd_handler.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_orphans.c

index e6feb14..8f3018a 100644 (file)
@@ -409,6 +409,7 @@ enum lu_object_header_flags {
          * once set.
          */
         LU_OBJECT_HEARD_BANSHEE = 0,
+        LU_OBJECT_ORPHAN        = 1,
 };
 
 enum lu_object_header_attr {
index 17b7c81..2e4ca88 100644 (file)
@@ -76,6 +76,66 @@ static const char *mdd_root_dir_name = "root";
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
+enum mdd_txn_op {
+        MDD_TXN_OBJECT_DESTROY_OP,
+        MDD_TXN_OBJECT_CREATE_OP,
+        MDD_TXN_ATTR_SET_OP,
+        MDD_TXN_XATTR_SET_OP,
+        MDD_TXN_INDEX_INSERT_OP,
+        MDD_TXN_INDEX_DELETE_OP,
+        MDD_TXN_LINK_OP,
+        MDD_TXN_UNLINK_OP,
+        MDD_TXN_RENAME_OP,
+        MDD_TXN_CREATE_DATA_OP,
+        MDD_TXN_MKDIR_OP
+};
+
+struct mdd_txn_op_descr {
+        enum mdd_txn_op mod_op;
+        unsigned int    mod_credits;
+};
+
+enum {
+        MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
+        MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
+        MDD_TXN_ATTR_SET_CREDITS       = 20,
+        MDD_TXN_XATTR_SET_CREDITS      = 20,
+        MDD_TXN_INDEX_INSERT_CREDITS   = 20,
+        MDD_TXN_INDEX_DELETE_CREDITS   = 20,
+        MDD_TXN_LINK_CREDITS           = 20,
+        MDD_TXN_UNLINK_CREDITS         = 20,
+        MDD_TXN_RENAME_CREDITS         = 20,
+        MDD_TXN_CREATE_DATA_CREDITS    = 20,
+        MDD_TXN_MKDIR_CREDITS          = 20
+};
+
+#define DEFINE_MDD_TXN_OP_DESC(opname)          \
+static const struct mdd_txn_op_descr opname = { \
+        .mod_op      = opname ## _OP,           \
+        .mod_credits = opname ## _CREDITS,      \
+}
+
+/*
+ * number of blocks to reserve for particular operations. Should be function
+ * of ... something. Stub for now.
+ */
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
+DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
+
+static void mdd_txn_param_build(const struct lu_context *ctx,
+                                const struct mdd_txn_op_descr *opd)
+{
+        mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
+}
 
 #define mdd_get_group_info(group_info) do {             \
         atomic_inc(&(group_info)->usage);               \
@@ -211,6 +271,47 @@ static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
         OBD_FREE_PTR(mdd);
 }
 
+static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
+                            lu_printer_t p, const struct lu_object *o)
+{
+        return (*p)(ctxt, cookie, LUSTRE_MDD_NAME"-object@%p", o);
+}
+
+/* orphan handling is here */
+static void mdd_object_delete(const struct lu_context *ctxt,
+                               struct lu_object *o)
+{
+        struct mdd_object *mdd_obj = lu2mdd_obj(o);
+        struct thandle *handle = NULL;
+        ENTRY;
+
+        if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
+                return;
+
+        if (test_bit(LU_OBJECT_ORPHAN, &o->lo_header->loh_flags)) {
+                mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
+                handle = mdd_trans_start(ctxt, lu2mdd_dev(o->lo_dev));
+                if (IS_ERR(handle))
+                        CERROR("Cannot get thandle\n");
+                else {
+                        mdd_write_lock(ctxt, mdd_obj);
+                        /* let's remove obj from the orphan list */
+                        __mdd_orphan_del(ctxt, mdd_obj, handle);
+                        mdd_write_unlock(ctxt, mdd_obj);
+                        mdd_trans_stop(ctxt, lu2mdd_dev(o->lo_dev),
+                                       0, handle);
+                }
+        }
+}
+
+static struct lu_object_operations mdd_lu_obj_ops = {
+       .loo_object_init    = mdd_object_init,
+       .loo_object_start   = mdd_object_start,
+       .loo_object_free    = mdd_object_free,
+       .loo_object_print   = mdd_object_print,
+        .loo_object_delete  = mdd_object_delete
+};
+
 struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
                                    struct mdd_device *d,
                                    const struct lu_fid *f)
@@ -548,91 +649,6 @@ static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
         RETURN(rc);
 }
 
-enum mdd_txn_op {
-        MDD_TXN_OBJECT_DESTROY_OP,
-        MDD_TXN_OBJECT_CREATE_OP,
-        MDD_TXN_ATTR_SET_OP,
-        MDD_TXN_XATTR_SET_OP,
-        MDD_TXN_INDEX_INSERT_OP,
-        MDD_TXN_INDEX_DELETE_OP,
-        MDD_TXN_LINK_OP,
-        MDD_TXN_UNLINK_OP,
-        MDD_TXN_RENAME_OP,
-        MDD_TXN_CREATE_DATA_OP,
-        MDD_TXN_MKDIR_OP
-};
-
-struct mdd_txn_op_descr {
-        enum mdd_txn_op mod_op;
-        unsigned int    mod_credits;
-};
-
-enum {
-        MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
-        MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
-        MDD_TXN_ATTR_SET_CREDITS       = 20,
-        MDD_TXN_XATTR_SET_CREDITS      = 20,
-        MDD_TXN_INDEX_INSERT_CREDITS   = 20,
-        MDD_TXN_INDEX_DELETE_CREDITS   = 20,
-        MDD_TXN_LINK_CREDITS           = 20,
-        MDD_TXN_UNLINK_CREDITS         = 20,
-        MDD_TXN_RENAME_CREDITS         = 20,
-        MDD_TXN_CREATE_DATA_CREDITS    = 20,
-        MDD_TXN_MKDIR_CREDITS          = 20
-};
-
-#define DEFINE_MDD_TXN_OP_DESC(opname)          \
-static const struct mdd_txn_op_descr opname = { \
-        .mod_op      = opname ## _OP,           \
-        .mod_credits = opname ## _CREDITS,      \
-}
-
-/*
- * number of blocks to reserve for particular operations. Should be function
- * of ... something. Stub for now.
- */
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
-DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
-
-static void mdd_txn_param_build(const struct lu_context *ctx,
-                                const struct mdd_txn_op_descr *opd)
-{
-        mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
-}
-
-static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
-                            lu_printer_t p, const struct lu_object *o)
-{
-        return (*p)(ctxt, cookie, LUSTRE_MDD_NAME"-object@%p", o);
-}
-
-static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
-{
-        int rc;
-        struct dt_object *root;
-        ENTRY;
-
-        root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
-                             &mdd->mdd_root_fid);
-        if (!IS_ERR(root)) {
-                LASSERT(root != NULL);
-                lu_object_put(ctx, &root->do_lu);
-                rc = orph_index_init(ctx, mdd);
-        } else
-                rc = PTR_ERR(root);
-
-        RETURN(rc);
-}
-
 static int mdd_txn_start_cb(const struct lu_context *ctx,
                             struct txn_param *param, void *cookie)
 {
@@ -671,8 +687,6 @@ static int mdd_device_init(const struct lu_context *ctx,
         mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
         mdd->mdd_txn_cb.dtc_cookie = mdd;
 
-        dt_txn_callback_add(dt, &mdd->mdd_txn_cb);
-
         RETURN(rc);
 }
 
@@ -681,15 +695,33 @@ static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
 {
        struct mdd_device *mdd = lu2mdd_dev(d);
         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
+        return next;
+}
 
-        dt_txn_callback_del(mdd->mdd_child, &mdd->mdd_txn_cb);
+static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
+{
+        int rc;
+        struct dt_object *root;
+        ENTRY;
 
-        return next;
+        dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
+        root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
+                             &mdd->mdd_root_fid);
+        if (!IS_ERR(root)) {
+                LASSERT(root != NULL);
+                lu_object_put(ctx, &root->do_lu);
+                rc = orph_index_init(ctx, mdd);
+        } else
+                rc = PTR_ERR(root);
+
+        RETURN(rc);
 }
 
 static void mdd_device_shutdown(const struct lu_context *ctxt,
                                 struct mdd_device *m)
 {
+        dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
         if (m->mdd_obd_dev)
                 mdd_fini_obd(ctxt, m);
         orph_index_fini(ctxt, m);
@@ -750,13 +782,14 @@ static int mdd_recovery_complete(const struct lu_context *ctxt,
         obd_notify(obd->u.mds.mds_osc_obd, NULL,
                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
                    OBD_NOTIFY_SYNC, NULL);
-*/
+        */
         LASSERT(mdd);
         LASSERT(obd);
 
         obd->obd_recovering = 0;
         obd->obd_type->typ_dt_ops->o_postrecov(obd);
         /* TODO: orphans handling */
+        __mdd_orphan_cleanup(ctxt, mdd);
         rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
 
         RETURN(rc);
@@ -768,13 +801,6 @@ struct lu_device_operations mdd_lu_ops = {
         .ldo_recovery_complete = mdd_recovery_complete
 };
 
-static struct lu_object_operations mdd_lu_obj_ops = {
-       .loo_object_init    = mdd_object_init,
-       .loo_object_start   = mdd_object_start,
-       .loo_object_free    = mdd_object_free,
-       .loo_object_print   = mdd_object_print
-};
-
 void mdd_write_lock(const struct lu_context *ctxt, struct mdd_object *obj)
 {
         struct dt_object  *next = mdd_object_child(obj);
@@ -1454,7 +1480,9 @@ int __mdd_object_kill(const struct lu_context *ctxt,
                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
                  * Caller must be ready for that. */
                 rc = __mdd_lmm_get(ctxt, obj, ma);
-                rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj), obj, ma);
+                if ((ma->ma_valid & MA_LOV))
+                        rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
+                                            obj, ma);
         }
         RETURN(rc);
 }
@@ -1469,12 +1497,14 @@ static int __mdd_finish_unlink(const struct lu_context *ctxt,
 
         rc = __mdd_iattr_get(ctxt, obj, ma);
         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
-                if (obj->mod_count == 0) {
+                /* add new orphan and the object
+                 * will be deleted during the object_put() */
+                if (__mdd_orphan_add(ctxt, obj, th) == 0)
+                        set_bit(LU_OBJECT_ORPHAN,
+                                &mdd2lu_obj(obj)->lo_header->loh_flags);
+                
+                if (obj->mod_count == 0)
                         rc = __mdd_object_kill(ctxt, obj, ma);
-                } else {
-                        /* add new orphan */
-                        rc = __mdd_orphan_add(ctxt, obj, th);
-                }
         }
         RETURN(rc);
 }
@@ -2692,13 +2722,13 @@ static int mdd_open(const struct lu_context *ctxt, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         int rc = 0;
 
-        mdd_write_lock(ctxt, md2mdd_obj(obj));
+        mdd_write_lock(ctxt, mdd_obj);
 
         rc = mdd_open_sanity_check(ctxt, mdd_obj, flags, uc);
         if (rc == 0)
-                md2mdd_obj(obj)->mod_count ++;
+                mdd_obj->mod_count ++;
 
-        mdd_write_unlock(ctxt, md2mdd_obj(obj));
+        mdd_write_unlock(ctxt, mdd_obj);
         return rc;
 }
 
@@ -2709,28 +2739,19 @@ static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
                      struct md_attr *ma, struct md_ucred *uc)
 {
         int rc;
-        struct mdd_object *mdd_obj;
-        struct thandle *handle = NULL;
+        struct mdd_object *mdd_obj = md2mdd_obj(obj);
         ENTRY;
 
-        mdd_obj = md2mdd_obj(obj);
-        mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
-        handle = mdd_trans_start(ctxt, mdo2mdd(obj));
-        if (IS_ERR(handle))
-                RETURN(-ENOMEM);
-
         mdd_write_lock(ctxt, mdd_obj);
+        /* release open count */
+        mdd_obj->mod_count --;
+
         rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
-        if (rc == 0 && (-- mdd_obj->mod_count) == 0) {
-                if (ma->ma_attr.la_nlink == 0) {
+        if (rc == 0 && mdd_obj->mod_count == 0) {
+                if (ma->ma_attr.la_nlink == 0)
                         rc = __mdd_object_kill(ctxt, mdd_obj, ma);
-                        if (rc == 0)
-                                /* let's remove obj from the orphan list */
-                                rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
-                }
         }
         mdd_write_unlock(ctxt, mdd_obj);
-        mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
         RETURN(rc);
 }
 
index e92dd44..cd8a6d3 100644 (file)
@@ -121,10 +121,11 @@ void mdd_read_unlock(const struct lu_context *ctxt, struct mdd_object *obj);
 void mdd_write_lock(const struct lu_context *ctxt, struct mdd_object *obj);
 void mdd_write_unlock(const struct lu_context *ctxt, struct mdd_object *obj);
 
+int __mdd_orphan_cleanup(const struct lu_context *ctx, struct mdd_device *d);
 int __mdd_orphan_add(const struct lu_context *, struct mdd_object *,
-                            struct thandle *);
+                     struct thandle *);
 int __mdd_orphan_del(const struct lu_context *, struct mdd_object *,
-                            struct thandle *);
+                     struct thandle *);
 int orph_index_init(const struct lu_context *ctx, struct mdd_device *mdd);
 void orph_index_fini(const struct lu_context *ctx, struct mdd_device *mdd);
 int __mdd_object_kill(const struct lu_context *, struct mdd_object *,
index 5209cc7..323ac5b 100644 (file)
@@ -89,7 +89,7 @@ static int orph_index_delete(const struct lu_context *ctx,
         struct orph_key *key = orph_key_fill(ctx, mdo2fid(obj), op);
         int rc;
         ENTRY;
-
+        LASSERT(dor);
         rc = dor->do_index_ops->dio_delete(ctx, dor,
                                            (struct dt_key *)key, th);
         RETURN(rc);
@@ -122,6 +122,7 @@ static void orph_key_test_and_del(const struct lu_context *ctx,
                 if (mdo->mod_count == 0) {
                         /* non-opened orphan, let's delete it */
                         struct md_attr *ma = &mdd_ctx_info(ctx)->mti_ma;
+                        CWARN("Found orphan!\n");
                         __mdd_object_kill(ctx, mdo, ma);
                         /* TODO: now handle OST objects */
                         //mdd_ost_objects_destroy(ctx, ma);
@@ -194,26 +195,81 @@ void orph_index_fini(const struct lu_context *ctx, struct mdd_device *mdd)
 {
         ENTRY;
         if (mdd->mdd_orphans != NULL) {
-                if (!IS_ERR(mdd->mdd_orphans))
-                        lu_object_put(ctx, &mdd->mdd_orphans->do_lu);
+                lu_object_put(ctx, &mdd->mdd_orphans->do_lu);
                 mdd->mdd_orphans = NULL;
         }
         EXIT;
 }
 
+int __mdd_orphan_cleanup(const struct lu_context *ctx, struct mdd_device *d)
+{
+        return orph_index_iterate(ctx, d);
+}
+
 int __mdd_orphan_add(const struct lu_context *ctx,
-                            struct mdd_object *obj,
-                            struct thandle *th)
+                     struct mdd_object *obj, struct thandle *th)
 {
         loff_t offset = 0;
         return orph_index_insert(ctx, obj, ORPH_OP_UNLINK, &offset, th);
 }
 
 int __mdd_orphan_del(const struct lu_context *ctx,
-                            struct mdd_object *obj,
-                            struct thandle *th)
+                     struct mdd_object *obj, struct thandle *th)
 {
         return orph_index_delete(ctx, obj, ORPH_OP_UNLINK, th);
 }
 
+/*
+ * used when destroying orphanes and from mds_reint_unlink() when MDS wants to
+ * destroy objects on OSS.
+ */
+/*
+int mdd_objects_destroy(struct mds_obd *mds, struct inode *inode,
+                  struct lov_mds_md *lmm, int lmm_size,
+                  struct llog_cookie *logcookies,
+                  int log_unlink, int async)
+{
+        struct lov_stripe_md *lsm = NULL;
+        struct obd_trans_info oti = { 0 };
+        struct obdo *oa;
+        int rc;
+        ENTRY;
+
+        if (lmm_size == 0)
+                RETURN(0);
 
+        rc = obd_unpackmd(mds->mds_dt_exp, &lsm, lmm, lmm_size);
+        if (rc < 0) {
+                CERROR("Error unpack md %p\n", lmm);
+                RETURN(rc);
+        } else {
+                LASSERT(rc >= sizeof(*lsm));
+                rc = 0;
+        }
+
+        oa = obdo_alloc();
+        if (oa == NULL)
+                GOTO(out_free_memmd, rc = -ENOMEM);
+        oa->o_id = lsm->lsm_object_id;
+        oa->o_gr = FILTER_GROUP_MDS0 + mds->mds_num;
+        oa->o_mode = inode->i_mode & S_IFMT;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
+
+        if (log_unlink && logcookies) {
+                oa->o_valid |= OBD_MD_FLCOOKIE;
+                oti.oti_logcookies = logcookies;
+        }
+
+        CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
+               (int)oa->o_id, (int)oa->o_gr);
+
+        if (async)
+                oti.oti_flags |= OBD_MODE_ASYNC;
+        
+        rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
+        obdo_free(oa);
+out_free_memmd:
+        obd_free_memmd(mds->mds_dt_exp, &lsm);
+        RETURN(rc);
+}
+*/