Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
index f4ea74b..64294cf 100644 (file)
@@ -190,6 +190,10 @@ static int orph_index_insert(const struct lu_env *env,
         int rc;
         ENTRY;
 
+        LASSERT(mdd_write_locked(env, obj) != 0);
+        LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
+        LASSERT(obj->mod_count > 0);
+
         mdd_orphan_write_lock(env, mdd);
 
         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
@@ -215,6 +219,9 @@ static int orph_index_insert(const struct lu_env *env,
                                        dotdot, th, BYPASS_CAPA, 1);
 
 out:
+        if (rc == 0)
+                obj->mod_flags |= ORPHAN_OBJ;
+
         mdd_orphan_write_unlock(env, mdd);
 
         RETURN(rc);
@@ -235,28 +242,24 @@ static int orphan_object_kill(const struct lu_env *env,
                               struct thandle *th)
 {
         struct lu_attr *la = &mdd_env_info(env)->mti_la;
-        int rc;
+        int rc = 0;
+        ENTRY;
 
         /* No need to lock this object as its recovery phase, and
          * no other thread can access it. But we need to lock it
          * as its precondition for osd api we using. */
 
-        mdd_write_lock(env, obj, MOR_TGT_CHILD);
         mdo_ref_del(env, obj, th);
         if (S_ISDIR(mdd_object_type(obj))) {
                 mdo_ref_del(env, obj, th);
                 mdd_orphan_ref_del(env, mdd, th);
-                mdd_write_unlock(env, obj);
         } else {
                 /* regular file , cleanup linked ost objects */
                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
-                mdd_write_unlock(env, obj);
-                if (rc)
-                        RETURN(rc);
-
-                mdd_lov_destroy(env, mdd, obj, la);
+                if (rc == 0)
+                        rc = mdd_lov_destroy(env, mdd, obj, la);
         }
-        return 0;
+        RETURN(rc);
 }
 
 static int orph_index_delete(const struct lu_env *env,
@@ -271,6 +274,10 @@ static int orph_index_delete(const struct lu_env *env,
 
         ENTRY;
 
+        LASSERT(mdd_write_locked(env, obj) != 0);
+        LASSERT(obj->mod_flags & ORPHAN_OBJ);
+        LASSERT(obj->mod_count == 0);
+
         LASSERT(dor);
 
         key = orph_key_fill(env, mdo2fid(obj), op);
@@ -290,10 +297,11 @@ static int orph_index_delete(const struct lu_env *env,
                         mdo_ref_del(env, obj, th);
                         mdd_orphan_ref_del(env, mdd, th);
                 }
-        } else
+                obj->mod_flags &= ~ORPHAN_OBJ;
+        } else {
                 CERROR("could not delete object: rc = %d\n",rc);
+        }
 
-        obj->mod_flags &= ~ORPHAN_OBJ;
         mdd_orphan_write_unlock(env, mdd);
         RETURN(rc);
 }
@@ -305,7 +313,7 @@ static int orphan_object_destroy(const struct lu_env *env,
 {
         struct thandle *th = NULL;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
-        int rc;
+        int rc = 0;
         ENTRY;
 
         mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
@@ -315,14 +323,17 @@ static int orphan_object_destroy(const struct lu_env *env,
                 RETURN(-ENOMEM);
         }
 
-        mdd_orphan_write_lock(env, mdd);
-        rc = mdd_orphan_delete_obj(env, mdd, key, th);
-        if (!rc)
-                orphan_object_kill(env, obj, mdd, th);
-        else
-                CERROR("could not delete object: rc = %d\n",rc);
-
-        mdd_orphan_write_unlock(env, mdd);
+        mdd_write_lock(env, obj, MOR_TGT_CHILD);
+        if (likely(obj->mod_count == 0)) {
+                mdd_orphan_write_lock(env, mdd);
+                rc = mdd_orphan_delete_obj(env, mdd, key, th);
+                if (!rc)
+                        orphan_object_kill(env, obj, mdd, th);
+                else
+                        CERROR("could not delete object: rc = %d\n",rc);
+                mdd_orphan_write_unlock(env, mdd);
+        }
+        mdd_write_unlock(env, obj);
         mdd_trans_stop(env, mdd, 0, th);
 
         RETURN(rc);
@@ -346,8 +357,13 @@ static int orph_key_test_and_del(const struct lu_env *env,
                 CWARN("Found orphan! Delete it\n");
                 rc = orphan_object_destroy(env, mdo, key);
         } else {
-                CDEBUG(D_HA, "Found orphan, open count = %d\n", mdo->mod_count);
-                mdo->mod_flags |= ORPHAN_OBJ;
+                mdd_write_lock(env, mdo, MOR_TGT_CHILD);
+                if (likely(mdo->mod_count > 0)) {
+                        CDEBUG(D_HA, "Found orphan, open count = %d\n",
+                               mdo->mod_count);
+                        mdo->mod_flags |= ORPHAN_OBJ;
+                }
+                mdd_write_unlock(env, mdo);
         }
 
         mdd_object_put(env, mdo);
@@ -380,8 +396,10 @@ static int orph_index_iterate(const struct lu_env *env,
                         do {
 
                                 key = (void *)iops->key(env, it);
-                                if (IS_ERR(key))
+                                if (IS_ERR(key)) {
+                                        CERROR("key failed when clean pending.\n");
                                         goto next;
+                                }
                                 key_sz = iops->key_size(env, it);
 
                                 /* filter out "." and ".." entries from
@@ -394,8 +412,10 @@ static int orph_index_iterate(const struct lu_env *env,
 
                                 if (orphan_key_to_fid(mti_key, &fid))
                                         goto next;
-                                if (!fid_is_sane(&fid))
+                                if (!fid_is_sane(&fid)) {
+                                        CERROR("fid is not sane when clean pending.\n");
                                         goto next;
+                                }
 
                                 /* kill orphan object */
                                 cookie =  iops->store(env, it);
@@ -413,13 +433,17 @@ next:
                                 result = iops->next(env, it);
                         } while (result == 0);
                         result = 0;
-                } else if (result == 0)
+                } else if (result == 0) {
+                        CERROR("Input/Output for clean pending.\n");
                         /* Index contains no zero key? */
                         result = -EIO;
+                }
                 iops->put(env, it);
                 iops->fini(env, it);
-        } else
+        } else {
+                CERROR("not enough memory for clean pending.\n");
                 result = -ENOMEM;
+        }
 
         RETURN(result);
 }