Whamcloud - gitweb
revert mdd_create() back to original order of operations
authornikita <nikita>
Thu, 29 Jun 2006 15:39:08 +0000 (15:39 +0000)
committernikita <nikita>
Thu, 29 Jun 2006 15:39:08 +0000 (15:39 +0000)
lustre/mdd/mdd_handler.c
lustre/osd/osd_handler.c

index 696f411..d29f0bf 100644 (file)
@@ -156,7 +156,7 @@ static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
         RETURN(rc);
 }
 
-static int __mdd_object_destroy(const struct lu_context *ctxt, 
+static int __mdd_object_destroy(const struct lu_context *ctxt,
                                 struct mdd_object *obj,
                                 struct thandle *handle)
 {
@@ -250,8 +250,8 @@ static void mdd_txn_param_build(const struct lu_context *ctx,
         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
 }
 
-static void mdd_object_release(const struct lu_context *ctxt, 
-                              struct lu_object *lo)
+static void mdd_object_release(const struct lu_context *ctxt,
+                               struct lu_object *lo)
 {
        struct mdd_device *mdd = lu2mdd_dev(lo->lo_dev);
         struct mdd_object *mdd_obj = lu2mdd_obj(lo);
@@ -260,6 +260,9 @@ static void mdd_object_release(const struct lu_context *ctxt,
         int rc = 0;
         ENTRY;
 
+        if (!lu_object_exists(ctxt, lo))
+                GOTO(out, rc);
+
         rc = mdd_attr_get(ctxt, &mdd_obj->mod_obj, lu_attr);
         if (rc < 0)
                 GOTO(out, rc);
@@ -304,7 +307,7 @@ static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
         int result;
         struct dt_object *root;
         ENTRY;
-        
+
         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
                              &mdd->mdd_root_fid);
         if (!IS_ERR(root)) {
@@ -453,8 +456,8 @@ static void mdd_trans_stop(const struct lu_context *ctxt,
         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
 }
 
-static int __mdd_object_create(const struct lu_context *ctxt, 
-                               struct mdd_object *obj, struct lu_attr *attr, 
+static int __mdd_object_create(const struct lu_context *ctxt,
+                               struct mdd_object *obj, struct lu_attr *attr,
                                struct thandle *handle)
 {
         struct dt_object *next;
@@ -535,7 +538,7 @@ static int __mdd_xattr_set(const struct lu_context *ctxt,struct mdd_device *mdd,
                                           handle);
 }
 
-static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj, 
+static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
                          const void *buf, int buf_len, const char *name)
 {
         struct mdd_device *mdd = mdo2mdd(obj);
@@ -556,25 +559,25 @@ static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
         RETURN(rc);
 }
 
-static int __mdd_index_insert(const struct lu_context *ctxt, 
-                              struct mdd_object *pobj, const struct lu_fid *lf, 
+static int __mdd_index_insert(const struct lu_context *ctxt,
+                              struct mdd_object *pobj, const struct lu_fid *lf,
                               const char *name, struct thandle *handle)
 {
         int rc;
         struct dt_object *next = mdd_object_child(pobj);
         ENTRY;
-        
+
         if (next->do_index_ops != NULL)
-                rc = next->do_index_ops->dio_insert(ctxt, next, 
-                                         (struct dt_rec *)lf, 
+                rc = next->do_index_ops->dio_insert(ctxt, next,
+                                         (struct dt_rec *)lf,
                                          (struct dt_key *)name, handle);
         else
                 rc = -ENOTDIR;
         RETURN(rc);
 }
 
-static int __mdd_index_delete(const struct lu_context *ctxt, 
-                              struct mdd_device *mdd, struct mdd_object *pobj, 
+static int __mdd_index_delete(const struct lu_context *ctxt,
+                              struct mdd_device *mdd, struct mdd_object *pobj,
                               const char *name, struct thandle *handle)
 {
         int rc;
@@ -582,7 +585,7 @@ static int __mdd_index_delete(const struct lu_context *ctxt,
         ENTRY;
 
         if (next->do_index_ops != NULL)
-                rc = next->do_index_ops->dio_delete(ctxt, next, 
+                rc = next->do_index_ops->dio_delete(ctxt, next,
                                         (struct dt_key *)name, handle);
         else
                 rc = -ENOTDIR;
@@ -655,7 +658,7 @@ static void mdd_rename_lock(struct mdd_device *mdd,
                             struct mdd_object *tgt_pobj,
                             struct mdd_object *tobj)
 {
-        
+
         return;
 }
 
@@ -669,7 +672,7 @@ static void mdd_rename_unlock(struct mdd_device *mdd,
 
 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
                       struct md_object *tgt_pobj, const struct lu_fid *lf,
-                      const char *sname, struct md_object *tobj, 
+                      const char *sname, struct md_object *tobj,
                       const char *tname)
 {
         struct mdd_device *mdd = mdo2mdd(src_pobj);
@@ -723,7 +726,7 @@ static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
         const struct dt_key *key = (const struct dt_key *)name;
         int result;
         ENTRY;
-        
+
         if (dir->do_index_ops != NULL)
                 result = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
         else
@@ -731,12 +734,16 @@ static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
         RETURN(result);
 }
 
+/*
+ * Create object and insert it into namespace.
+ */
 static int mdd_create(const struct lu_context *ctxt,
                       struct md_object *pobj, const char *name,
                       struct md_object *child, struct lu_attr* attr)
 {
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct mdd_object *mdo = md2mdd_obj(pobj);
+        struct mdd_object *son = md2mdd_obj(child);
         struct thandle *handle;
         int rc = 0;
         ENTRY;
@@ -748,14 +755,58 @@ static int mdd_create(const struct lu_context *ctxt,
 
         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
 
-        rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
-                                name, handle);
+        /*
+         * Two operations have to be performed:
+         *
+         *  - allocation of new object (->do_object_create()), and
+         *
+         *  - insertion into parent index (->dio_insert()).
+         *
+         * Due to locking, operation order is not important, when both are
+         * successful, *but* error handling cases are quite different:
+         *
+         *  - if insertion is done first, and following object creation fails,
+         *  insertion has to be rolled back, but this operation might fail
+         *  also leaving us with dangling index entry.
+         *
+         *  - if creation is done first, is has to be undone if insertion
+         *  fails, leaving us with leaked space, which is neither good, nor
+         *  fatal.
+         *
+         * It seems that creation-first is simplest solution, but it is
+         * sub-optimal in the frequent
+         *
+         *         $ mkdir foo
+         *         $ mkdir foo
+         *
+         * case, because second mkdir is bound to create object, only to
+         * destroy it immediately.
+         *
+         * Note that local file systems do
+         *
+         *     0. lookup -> -EEXIST
+         *
+         *     1. create
+         *
+         *     2. insert
+         *
+         * Maybe we should do the same. For now: creation-first.
+         */
+
+        rc = __mdd_object_create(ctxt, son, attr, handle);
         if (rc)
                 GOTO(cleanup, rc);
 
-        rc = __mdd_object_create(ctxt, md2mdd_obj(child), attr, handle);
-        if (rc)
-                GOTO(cleanup, rc);
+        rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
+                                name, handle);
+        if (rc) {
+                int rc2;
+
+                rc2 = __mdd_object_destroy(ctxt, son, handle);
+                if (rc2)
+                        CERROR("Cannot cleanup insertion failure: %d/%d\n",
+                               rc, rc2);
+        }
 
 cleanup:
         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
index 8023249..3829919 100644 (file)
@@ -318,7 +318,7 @@ static void osd_object_delete(const struct lu_context *ctx, struct lu_object *l)
                 if (o->oo_container.ic_object == o->oo_inode)
                         iam_container_fini(&o->oo_container);
                 iput(o->oo_inode);
-                o->oo_inode = (void *)0xdeaddead;
+                o->oo_inode = NULL;
         }
 }
 
@@ -691,6 +691,26 @@ static int osd_object_create(const struct lu_context *ctx, struct dt_object *dt,
         return result;
 }
 
+/*
+ * Destroy existing object.
+ *
+ * precondition: lu_object_exists(ctxt, &dt->do_lu);
+ * postcondition: ergo(result == 0,
+ *                     !lu_object_exists(ctxt, &dt->do_lu));
+ */
+int osd_object_destroy(const struct lu_context *ctxt,
+                       struct dt_object *dt, struct thandle *th)
+{
+        /*
+         * Stub for now, just drop inode.
+         */
+        LASSERT(lu_object_exists(ctxt, &dt->do_lu));
+        CWARN("Stub!\n");
+        osd_object_delete(ctxt, &dt->do_lu);
+        LASSERT(!lu_object_exists(ctxt, &dt->do_lu));
+        return 0;
+}
+
 static void osd_inode_inc_link(const struct lu_context *ctxt,
                                struct inode *inode, struct thandle *th)
 {
@@ -735,6 +755,7 @@ static struct dt_object_operations osd_obj_ops = {
         .do_object_unlock    = osd_object_unlock,
         .do_attr_get         = osd_attr_get,
         .do_object_create    = osd_object_create,
+        .do_object_destroy   = osd_object_destroy,
         .do_object_index_try = osd_index_try,
         .do_object_ref_add   = osd_object_ref_add,
         .do_object_ref_del   = osd_object_ref_del