Whamcloud - gitweb
orphan list iterator. prototype
authortappro <tappro>
Sun, 3 Sep 2006 19:46:25 +0000 (19:46 +0000)
committertappro <tappro>
Sun, 3 Sep 2006 19:46:25 +0000 (19:46 +0000)
lustre/mdd/mdd_handler.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_orphans.c

index 3280f6c..d0aef07 100644 (file)
@@ -91,7 +91,7 @@ static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
                 lu_object_init(o, NULL, d);
                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
-                atomic_set(&mdd_obj->mod_count, 0);
+                mdd_obj->mod_count = 0;
                 o->lo_ops = &mdd_lu_obj_ops;
                 return o;
         } else {
@@ -142,21 +142,15 @@ struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
         struct mdd_object *m;
         ENTRY;
 
-        o = lu_object_find(ctxt, d->mdd_md_dev.md_lu_dev.ld_site, f);
+        o = lu_object_find(ctxt, mdd2lu_dev(d)->ld_site, f);
         if (IS_ERR(o))
                 m = (struct mdd_object *)o;
         else
                 m = lu2mdd_obj(lu_object_locate(o->lo_header,
-                                 d->mdd_md_dev.md_lu_dev.ld_type));
+                               mdd2lu_dev(d)->ld_type));
         RETURN(m);
 }
 
-static inline void mdd_object_put(const struct lu_context *ctxt,
-                                  struct mdd_object *o)
-{
-        lu_object_put(ctxt, &o->mod_obj.mo_lu);
-}
-
 static inline int mdd_is_immutable(struct mdd_object *obj)
 {
         return obj->mod_flags & IMMUTE_OBJ;
@@ -1140,9 +1134,9 @@ static int mdd_dir_is_empty(const struct lu_context *ctx,
 
 /* return md_attr back,
  * if it is last unlink then return lov ea + llog cookie*/
-static inline int __mdd_object_kill(const struct lu_context *ctxt,
-                                    struct mdd_object *obj,
-                                    struct md_attr *ma)
+int __mdd_object_kill(const struct lu_context *ctxt,
+                      struct mdd_object *obj,
+                      struct md_attr *ma)
 {
         int rc = 0;
 
@@ -1165,7 +1159,7 @@ static int __mdd_finish_unlink(const struct lu_context *ctxt,
 
         rc = __mdd_iattr_get(ctxt, obj, ma);
         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
-                if (atomic_read(&obj->mod_count) == 0) {
+                if (obj->mod_count == 0) {
                         rc = __mdd_object_kill(ctxt, obj, ma);
                 } else {
                         /* add new orphan */
@@ -2039,13 +2033,17 @@ static int mdd_open(const struct lu_context *ctxt, struct md_object *obj,
                     int flags)
 {
         int mode = accmode(md2mdd_obj(obj), flags);
+        
+        mdd_write_lock(ctxt, md2mdd_obj(obj));
 
         if (mode & MAY_WRITE) {
                 if (mdd_is_immutable(md2mdd_obj(obj)))
                         RETURN(-EACCES);
         }
+        
+        md2mdd_obj(obj)->mod_count ++;
 
-        atomic_inc(&md2mdd_obj(obj)->mod_count);
+        mdd_write_unlock(ctxt, md2mdd_obj(obj));
         return 0;
 }
 
@@ -2058,32 +2056,23 @@ static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
         ENTRY;
 
         mdd_obj = md2mdd_obj(obj);
-        mdd_read_lock(ctxt, mdd_obj);
-        rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
-        if (rc)
-                GOTO(out_locked, rc);
+        mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
+        handle = mdd_trans_start(ctxt, mdo2mdd(obj));
+        if (IS_ERR(handle))
+                RETURN(-ENOMEM);
 
-        if (atomic_dec_and_test(&mdd_obj->mod_count)) {
+        mdd_write_lock(ctxt, mdd_obj);
+        rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
+        if (rc == 0 && (-- mdd_obj->mod_count) == 0) {
                 if (ma->ma_attr.la_nlink == 0) {
                         rc = __mdd_object_kill(ctxt, mdd_obj, ma);
-                        if (rc)
-                                GOTO(out_locked, rc);
-                        mdd_read_unlock(ctxt, mdd_obj);
-                        /* let's remove obj from the orphan list */
-                        mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
-                        handle = mdd_trans_start(ctxt, mdo2mdd(obj));
-                        if (IS_ERR(handle))
-                                GOTO(out, rc = -ENOMEM);
-
-                        rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
-
-                        mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
-                        GOTO(out, rc);
+                        if (rc == 0)
+                                /* let's remove obj from the orphan list */
+                                rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
                 }
         }
-out_locked:
-        mdd_read_unlock(ctxt, mdd_obj);
-out:
+        mdd_write_unlock(ctxt, mdd_obj);
+        mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
         RETURN(rc);
 }
 
index fc08bb3..6f0e552 100644 (file)
@@ -55,7 +55,7 @@ enum mod_flags {
 struct mdd_object {
         struct md_object  mod_obj;
         /* open count */
-        atomic_t          mod_count;
+        __u32             mod_count;
         __u32             mod_valid;
         unsigned long     mod_flags;
 };
@@ -71,6 +71,7 @@ struct mdd_thread_info {
         struct txn_param  mti_param;
         struct lu_fid     mti_fid;
         struct lu_attr    mti_la;
+        struct md_attr    mti_ma;
         struct lu_attr    mti_la_for_fix;
         struct lov_mds_md mti_lmm;
         struct obd_info   mti_oi;
@@ -118,6 +119,16 @@ int __mdd_orphan_del(const struct lu_context *, struct mdd_object *,
                             struct thandle *);
 int orph_index_init(const struct lu_context *ctx, struct mdd_device *mdd);
 void orph_index_fini(const struct lu_context *ctx, struct mdd_device *mdd);
+int __mdd_object_kill(const struct lu_context *, struct mdd_object *,
+                      struct md_attr *);
+struct mdd_object *mdd_object_find(const struct lu_context *,
+                                   struct mdd_device *,
+                                   const struct lu_fid *);
+static inline void mdd_object_put(const struct lu_context *ctxt,
+                                  struct mdd_object *o)
+{
+        lu_object_put(ctxt, &o->mod_obj.mo_lu);
+}
 
 extern struct lu_device_operations mdd_lu_ops;
 static inline int lu_device_is_mdd(struct lu_device *d)
index dc49514..6adce75 100644 (file)
@@ -95,23 +95,74 @@ static int orph_index_delete(const struct lu_context *ctx,
         RETURN(rc);
 
 }
-#if 0
-static int orph_index_iterate(struct lu_server_orph *orph,
-                       const struct lu_context *ctx,
-                       seqno_t seq, mdsno_t *mds)
+
+static inline struct orph_key *orph_key_empty(const struct lu_context *ctx,
+                                              __u32 op)
 {
-        struct dt_object *dt_obj = orph->orph_obj;
-        struct dt_rec    *rec = orph_rec(ctx, 0);
-        int rc;
+        struct orph_key *key = &mdd_ctx_info(ctx)->mti_orph_key;
+        LASSERT(key);
+        key->ok_fid.f_seq = 0;
+        key->ok_fid.f_oid = 0;
+        key->ok_fid.f_ver = 0;
+        key->ok_op = cpu_to_be32(op);
+        return key;
+}
+
+static void orph_key_test_and_del(const struct lu_context *ctx,
+                                  struct mdd_device *mdd,
+                                  const struct orph_key *key)
+{
+        struct mdd_object *mdo;
+
+        mdo = mdd_object_find(ctx, mdd, &key->ok_fid);
+        if (IS_ERR(mdo))
+                CERROR("Invalid orphan!\n");
+        else {
+                if (mdo->mod_count == 0) {
+                        /* non-opened orphan, let's delete it */
+                        struct md_attr *ma = &mdd_ctx_info(ctx)->mti_ma;
+                        __mdd_object_kill(ctx, mdo, ma);
+                        /* TODO: now handle OST objects */
+                        //mdd_ost_objects_destroy(ctx, ma);
+                        /* TODO: destroy index entry */
+                }
+                mdd_object_put(ctx, mdo);
+        }        
+}
+
+static int orph_index_iterate(const struct lu_context *ctx,
+                              struct mdd_device *mdd)
+{
+        struct dt_object *dt_obj = mdd->mdd_orphans;
+        struct dt_it     *it;
+        struct dt_it_ops *iops;
+        struct orph_key  *key = orph_key_empty(ctx, 0);
+        int result;
         ENTRY;
 
-        rc = dt_obj->do_index_ops->dio_lookup(ctx, dt_obj, rec,
-                                              orph_key(ctx, seq));
-        if (rc == 0)
-                *mds = be64_to_cpu(*(__u64 *)rec);
-        RETURN(rc);
+        iops = &dt_obj->do_index_ops->dio_it;
+        it = iops->init(ctx, dt_obj, 1);
+        if (it != NULL) {
+                result = iops->get(ctx, it, (const void *)key);
+                if (result > 0) {
+                        int i;
+                        /* main cycle */
+                        for (result = 0, i = 0; result == +1; ++i) {
+                                key = (void *)iops->key(ctx, it);
+                                orph_key_test_and_del(ctx, mdd, key);
+                                result = iops->next(ctx, it);
+                        }
+                        iops->put(ctx, it);
+                } else if (result == 0)
+                        /* Index contains no zero key? */
+                        result = -EIO;
+                iops->fini(ctx, it);
+        } else
+                result = -ENOMEM;
+
+        RETURN(result);
 }
-#endif
+
 int orph_index_init(const struct lu_context *ctx, struct mdd_device *mdd)
 {
         struct lu_fid fid;