Whamcloud - gitweb
Branch HEAD
authorfanyong <fanyong>
Fri, 19 Jun 2009 16:47:22 +0000 (16:47 +0000)
committerfanyong <fanyong>
Fri, 19 Jun 2009 16:47:22 +0000 (16:47 +0000)
b=19149
i=tappro
i=tom.wang

Cleanup pending files after client eviction.

lustre/include/dt_object.h
lustre/include/lustre/lustre_idl.h
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lock.c
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_object.c
lustre/mdd/mdd_orphans.c
lustre/mdt/mdt_handler.c
lustre/osd/osd_handler.c

index 160f47a..242473e 100644 (file)
@@ -257,6 +257,8 @@ struct dt_object_operations {
                                 struct dt_object *dt);
         void  (*do_write_unlock)(const struct lu_env *env,
                                  struct dt_object *dt);
                                 struct dt_object *dt);
         void  (*do_write_unlock)(const struct lu_env *env,
                                  struct dt_object *dt);
+        int  (*do_write_locked)(const struct lu_env *env,
+                                struct dt_object *dt);
         /**
          * Note: following ->do_{x,}attr_{set,get}() operations are very
          * similar to ->moo_{x,}attr_{set,get}() operations in struct
         /**
          * Note: following ->do_{x,}attr_{set,get}() operations are very
          * similar to ->moo_{x,}attr_{set,get}() operations in struct
index e648bda..099a552 100644 (file)
@@ -1595,12 +1595,13 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa);
 #define MAY_RGETFACL    (1 << 14)
 
 enum {
 #define MAY_RGETFACL    (1 << 14)
 
 enum {
-        MDS_CHECK_SPLIT  = 1 << 0,
-        MDS_CROSS_REF    = 1 << 1,
-        MDS_VTX_BYPASS   = 1 << 2,
-        MDS_PERM_BYPASS  = 1 << 3,
-        MDS_SOM          = 1 << 4,
-        MDS_QUOTA_IGNORE = 1 << 5
+        MDS_CHECK_SPLIT   = 1 << 0,
+        MDS_CROSS_REF     = 1 << 1,
+        MDS_VTX_BYPASS    = 1 << 2,
+        MDS_PERM_BYPASS   = 1 << 3,
+        MDS_SOM           = 1 << 4,
+        MDS_QUOTA_IGNORE  = 1 << 5,
+        MDS_CLOSE_CLEANUP = 1 << 6
 };
 
 struct mds_rec_create {
 };
 
 struct mds_rec_create {
index f5bfade..9ed0144 100644 (file)
@@ -766,23 +766,27 @@ int mdd_finish_unlink(const struct lu_env *env,
         int reset = 1;
         ENTRY;
 
         int reset = 1;
         ENTRY;
 
+        LASSERT(mdd_write_locked(env, obj) != 0);
+
         rc = mdd_iattr_get(env, obj, ma);
         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
         rc = mdd_iattr_get(env, obj, ma);
         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
+                obj->mod_flags |= DEAD_OBJ;
                 /* add new orphan and the object
                  * will be deleted during mdd_close() */
                 if (obj->mod_count) {
                         rc = __mdd_orphan_add(env, obj, th);
                 /* add new orphan and the object
                  * will be deleted during mdd_close() */
                 if (obj->mod_count) {
                         rc = __mdd_orphan_add(env, obj, th);
-                        if (rc == 0) {
-                                obj->mod_flags |= ORPHAN_OBJ;
-                                CDEBUG(D_HA, "Object "DFID" is going to be "
-                                        "an orphan, open count = %d\n",
+                        if (rc == 0)
+                                CDEBUG(D_HA, "Object "DFID" is inserted into "
+                                        "orphan list, open count = %d\n",
                                         PFID(mdd_object_fid(obj)),
                                         obj->mod_count);
                                         PFID(mdd_object_fid(obj)),
                                         obj->mod_count);
-                        }
-                }
-
-                obj->mod_flags |= DEAD_OBJ;
-                if (!(obj->mod_flags & ORPHAN_OBJ)) {
+                        else
+                                CERROR("Object "DFID" fail to be an orphan, "
+                                       "open count = %d, maybe cause failed "
+                                       "open replay\n",
+                                        PFID(mdd_object_fid(obj)),
+                                        obj->mod_count);
+                } else {
                         rc = mdd_object_kill(env, obj, ma);
                         if (rc == 0)
                                 reset = 0;
                         rc = mdd_object_kill(env, obj, ma);
                         if (rc == 0)
                                 reset = 0;
index fc67d61..c905c97 100644 (file)
@@ -270,6 +270,7 @@ void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj,
                    enum mdd_object_role role);
 void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj);
 void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj);
                    enum mdd_object_role role);
 void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj);
 void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj);
+int mdd_write_locked(const struct lu_env *env, struct mdd_object *obj);
 
 void mdd_pdlock_init(struct mdd_object *obj);
 unsigned long mdd_name2hash(const char *name);
 
 void mdd_pdlock_init(struct mdd_object *obj);
 unsigned long mdd_name2hash(const char *name);
index b84d4c2..d5b6730 100644 (file)
@@ -113,6 +113,12 @@ void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
         next->do_ops->do_read_unlock(env, next);
 }
 
         next->do_ops->do_read_unlock(env, next);
 }
 
+int mdd_write_locked(const struct lu_env *env, struct mdd_object *obj)
+{
+        struct dt_object  *next = mdd_object_child(obj);
+
+        return next->do_ops->do_write_locked(env, next);
+}
 
 /* Methods for parallel directory locking */
 
 
 /* Methods for parallel directory locking */
 
index eddfebc..bb2e1a8 100644 (file)
@@ -603,9 +603,8 @@ int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
 }
 
 /*
 }
 
 /*
- * called with obj not locked. 
+ * called with obj locked. 
  */
  */
-
 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
                     struct mdd_object *obj, struct lu_attr *la)
 {
 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
                     struct mdd_object *obj, struct lu_attr *la)
 {
@@ -613,6 +612,11 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
         int                rc;
         ENTRY;
 
         int                rc;
         ENTRY;
 
+        LASSERT(mdd_write_locked(env, obj) != 0);
+
+        if (unlikely(!S_ISREG(mdd_object_type(obj))))
+                RETURN(0);
+
         if (unlikely(la->la_nlink != 0)) {
                 CWARN("Attempt to destroy OSS object when nlink == %d\n",
                       la->la_nlink);
         if (unlikely(la->la_nlink != 0)) {
                 CWARN("Attempt to destroy OSS object when nlink == %d\n",
                       la->la_nlink);
@@ -628,8 +632,8 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
 
         /* get lov ea */
 
 
         /* get lov ea */
 
-        rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
-                               XATTR_NAME_LOV);
+        rc = mdd_get_md(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
+                        XATTR_NAME_LOV);
 
         if (rc <= 0) {
                 CWARN("Get lov ea failed for "DFID" rc = %d\n",
 
         if (rc <= 0) {
                 CWARN("Get lov ea failed for "DFID" rc = %d\n",
index 79166fc..c2b1d7e 100644 (file)
@@ -1846,6 +1846,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                      struct md_attr *ma)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
                      struct md_attr *ma)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
+        struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle    *handle;
         int rc;
         int reset = 1;
         struct thandle    *handle;
         int rc;
         int reset = 1;
@@ -1869,27 +1870,53 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         /* release open count */
         mdd_obj->mod_count --;
 
         /* release open count */
         mdd_obj->mod_count --;
 
-        if (mdd_obj->mod_count == 0) {
+        if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
                 /* remove link to object from orphan index */
                 /* remove link to object from orphan index */
-                if (mdd_obj->mod_flags & ORPHAN_OBJ)
-                        __mdd_orphan_del(env, mdd_obj, handle);
+                rc = __mdd_orphan_del(env, mdd_obj, handle);
+                if (rc == 0) {
+                        CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+                               "list, OSS objects to be destroyed.\n",
+                               PFID(mdd_object_fid(mdd_obj)));
+                } else {
+                        CERROR("Object "DFID" can not be deleted from orphan "
+                                "list, maybe cause OST objects can not be "
+                                "destroyed (err: %d).\n",
+                                PFID(mdd_object_fid(mdd_obj)), rc);
+                        /* If object was not deleted from orphan list, do not
+                         * destroy OSS objects, which will be done when next
+                         * recovery. */
+                        GOTO(out, rc);
+                }
         }
 
         rc = mdd_iattr_get(env, mdd_obj, ma);
         }
 
         rc = mdd_iattr_get(env, mdd_obj, ma);
-        if (rc == 0) {
-                if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
-                        rc = mdd_object_kill(env, mdd_obj, ma);
+        /* Object maybe not in orphan list originally, it is rare case for
+         * mdd_finish_unlink() failure. */
+        if (rc == 0 && ma->ma_attr.la_nlink == 0) {
 #ifdef HAVE_QUOTA_SUPPORT
 #ifdef HAVE_QUOTA_SUPPORT
-                        if (mds->mds_quota) {
-                                quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
-                                mdd_quota_wrapper(&ma->ma_attr, qids);
-                        }
+                if (mds->mds_quota) {
+                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+                        mdd_quota_wrapper(&ma->ma_attr, qids);
+                }
 #endif
 #endif
-                        if (rc == 0)
-                                reset = 0;
+                /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
+                if (ma->ma_valid & MA_FLAGS &&
+                    ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
+                        rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
+                } else {
+                        rc = mdd_object_kill(env, mdd_obj, ma);
+                                if (rc == 0)
+                                        reset = 0;
                 }
                 }
+
+                if (rc != 0)
+                        CERROR("Error when prepare to delete Object "DFID" , "
+                               "which will cause OST objects can not be "
+                               "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
         }
         }
+        EXIT;
 
 
+out:
         if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
 
         if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
 
@@ -1902,7 +1929,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
                               quota_opc);
 #endif
                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
                               quota_opc);
 #endif
-        RETURN(rc);
+        return rc;
 }
 
 /*
 }
 
 /*
index f4ea74b..64294cf 100644 (file)
@@ -190,6 +190,10 @@ static int orph_index_insert(const struct lu_env *env,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
+        LASSERT(mdd_write_locked(env, obj) != 0);
+        LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
+        LASSERT(obj->mod_count > 0);
+
         mdd_orphan_write_lock(env, mdd);
 
         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
         mdd_orphan_write_lock(env, mdd);
 
         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
@@ -215,6 +219,9 @@ static int orph_index_insert(const struct lu_env *env,
                                        dotdot, th, BYPASS_CAPA, 1);
 
 out:
                                        dotdot, th, BYPASS_CAPA, 1);
 
 out:
+        if (rc == 0)
+                obj->mod_flags |= ORPHAN_OBJ;
+
         mdd_orphan_write_unlock(env, mdd);
 
         RETURN(rc);
         mdd_orphan_write_unlock(env, mdd);
 
         RETURN(rc);
@@ -235,28 +242,24 @@ static int orphan_object_kill(const struct lu_env *env,
                               struct thandle *th)
 {
         struct lu_attr *la = &mdd_env_info(env)->mti_la;
                               struct thandle *th)
 {
         struct lu_attr *la = &mdd_env_info(env)->mti_la;
-        int rc;
+        int rc = 0;
+        ENTRY;
 
         /* No need to lock this object as its recovery phase, and
          * no other thread can access it. But we need to lock it
          * as its precondition for osd api we using. */
 
 
         /* No need to lock this object as its recovery phase, and
          * no other thread can access it. But we need to lock it
          * as its precondition for osd api we using. */
 
-        mdd_write_lock(env, obj, MOR_TGT_CHILD);
         mdo_ref_del(env, obj, th);
         if (S_ISDIR(mdd_object_type(obj))) {
                 mdo_ref_del(env, obj, th);
                 mdd_orphan_ref_del(env, mdd, th);
         mdo_ref_del(env, obj, th);
         if (S_ISDIR(mdd_object_type(obj))) {
                 mdo_ref_del(env, obj, th);
                 mdd_orphan_ref_del(env, mdd, th);
-                mdd_write_unlock(env, obj);
         } else {
                 /* regular file , cleanup linked ost objects */
                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
         } else {
                 /* regular file , cleanup linked ost objects */
                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
-                mdd_write_unlock(env, obj);
-                if (rc)
-                        RETURN(rc);
-
-                mdd_lov_destroy(env, mdd, obj, la);
+                if (rc == 0)
+                        rc = mdd_lov_destroy(env, mdd, obj, la);
         }
         }
-        return 0;
+        RETURN(rc);
 }
 
 static int orph_index_delete(const struct lu_env *env,
 }
 
 static int orph_index_delete(const struct lu_env *env,
@@ -271,6 +274,10 @@ static int orph_index_delete(const struct lu_env *env,
 
         ENTRY;
 
 
         ENTRY;
 
+        LASSERT(mdd_write_locked(env, obj) != 0);
+        LASSERT(obj->mod_flags & ORPHAN_OBJ);
+        LASSERT(obj->mod_count == 0);
+
         LASSERT(dor);
 
         key = orph_key_fill(env, mdo2fid(obj), op);
         LASSERT(dor);
 
         key = orph_key_fill(env, mdo2fid(obj), op);
@@ -290,10 +297,11 @@ static int orph_index_delete(const struct lu_env *env,
                         mdo_ref_del(env, obj, th);
                         mdd_orphan_ref_del(env, mdd, th);
                 }
                         mdo_ref_del(env, obj, th);
                         mdd_orphan_ref_del(env, mdd, th);
                 }
-        } else
+                obj->mod_flags &= ~ORPHAN_OBJ;
+        } else {
                 CERROR("could not delete object: rc = %d\n",rc);
                 CERROR("could not delete object: rc = %d\n",rc);
+        }
 
 
-        obj->mod_flags &= ~ORPHAN_OBJ;
         mdd_orphan_write_unlock(env, mdd);
         RETURN(rc);
 }
         mdd_orphan_write_unlock(env, mdd);
         RETURN(rc);
 }
@@ -305,7 +313,7 @@ static int orphan_object_destroy(const struct lu_env *env,
 {
         struct thandle *th = NULL;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
 {
         struct thandle *th = NULL;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
-        int rc;
+        int rc = 0;
         ENTRY;
 
         mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
         ENTRY;
 
         mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
@@ -315,14 +323,17 @@ static int orphan_object_destroy(const struct lu_env *env,
                 RETURN(-ENOMEM);
         }
 
                 RETURN(-ENOMEM);
         }
 
-        mdd_orphan_write_lock(env, mdd);
-        rc = mdd_orphan_delete_obj(env, mdd, key, th);
-        if (!rc)
-                orphan_object_kill(env, obj, mdd, th);
-        else
-                CERROR("could not delete object: rc = %d\n",rc);
-
-        mdd_orphan_write_unlock(env, mdd);
+        mdd_write_lock(env, obj, MOR_TGT_CHILD);
+        if (likely(obj->mod_count == 0)) {
+                mdd_orphan_write_lock(env, mdd);
+                rc = mdd_orphan_delete_obj(env, mdd, key, th);
+                if (!rc)
+                        orphan_object_kill(env, obj, mdd, th);
+                else
+                        CERROR("could not delete object: rc = %d\n",rc);
+                mdd_orphan_write_unlock(env, mdd);
+        }
+        mdd_write_unlock(env, obj);
         mdd_trans_stop(env, mdd, 0, th);
 
         RETURN(rc);
         mdd_trans_stop(env, mdd, 0, th);
 
         RETURN(rc);
@@ -346,8 +357,13 @@ static int orph_key_test_and_del(const struct lu_env *env,
                 CWARN("Found orphan! Delete it\n");
                 rc = orphan_object_destroy(env, mdo, key);
         } else {
                 CWARN("Found orphan! Delete it\n");
                 rc = orphan_object_destroy(env, mdo, key);
         } else {
-                CDEBUG(D_HA, "Found orphan, open count = %d\n", mdo->mod_count);
-                mdo->mod_flags |= ORPHAN_OBJ;
+                mdd_write_lock(env, mdo, MOR_TGT_CHILD);
+                if (likely(mdo->mod_count > 0)) {
+                        CDEBUG(D_HA, "Found orphan, open count = %d\n",
+                               mdo->mod_count);
+                        mdo->mod_flags |= ORPHAN_OBJ;
+                }
+                mdd_write_unlock(env, mdo);
         }
 
         mdd_object_put(env, mdo);
         }
 
         mdd_object_put(env, mdo);
@@ -380,8 +396,10 @@ static int orph_index_iterate(const struct lu_env *env,
                         do {
 
                                 key = (void *)iops->key(env, it);
                         do {
 
                                 key = (void *)iops->key(env, it);
-                                if (IS_ERR(key))
+                                if (IS_ERR(key)) {
+                                        CERROR("key failed when clean pending.\n");
                                         goto next;
                                         goto next;
+                                }
                                 key_sz = iops->key_size(env, it);
 
                                 /* filter out "." and ".." entries from
                                 key_sz = iops->key_size(env, it);
 
                                 /* filter out "." and ".." entries from
@@ -394,8 +412,10 @@ static int orph_index_iterate(const struct lu_env *env,
 
                                 if (orphan_key_to_fid(mti_key, &fid))
                                         goto next;
 
                                 if (orphan_key_to_fid(mti_key, &fid))
                                         goto next;
-                                if (!fid_is_sane(&fid))
+                                if (!fid_is_sane(&fid)) {
+                                        CERROR("fid is not sane when clean pending.\n");
                                         goto next;
                                         goto next;
+                                }
 
                                 /* kill orphan object */
                                 cookie =  iops->store(env, it);
 
                                 /* kill orphan object */
                                 cookie =  iops->store(env, it);
@@ -413,13 +433,17 @@ next:
                                 result = iops->next(env, it);
                         } while (result == 0);
                         result = 0;
                                 result = iops->next(env, it);
                         } while (result == 0);
                         result = 0;
-                } else if (result == 0)
+                } else if (result == 0) {
+                        CERROR("Input/Output for clean pending.\n");
                         /* Index contains no zero key? */
                         result = -EIO;
                         /* Index contains no zero key? */
                         result = -EIO;
+                }
                 iops->put(env, it);
                 iops->fini(env, it);
                 iops->put(env, it);
                 iops->fini(env, it);
-        } else
+        } else {
+                CERROR("not enough memory for clean pending.\n");
                 result = -ENOMEM;
                 result = -ENOMEM;
+        }
 
         RETURN(result);
 }
 
         RETURN(result);
 }
index c299486..ca8d64a 100644 (file)
@@ -5147,10 +5147,11 @@ static int mdt_mfd_cleanup(struct obd_export *exp)
                 int cookie_size;
 
                 lmm_size = mdt->mdt_max_mdsize;
                 int cookie_size;
 
                 lmm_size = mdt->mdt_max_mdsize;
-                cookie_size = mdt->mdt_max_cookiesize;
                 OBD_ALLOC(ma->ma_lmm, lmm_size);
                 if (ma->ma_lmm == NULL)
                         GOTO(out_lmm, rc = -ENOMEM);
                 OBD_ALLOC(ma->ma_lmm, lmm_size);
                 if (ma->ma_lmm == NULL)
                         GOTO(out_lmm, rc = -ENOMEM);
+
+                cookie_size = mdt->mdt_max_cookiesize;
                 OBD_ALLOC(ma->ma_cookie, cookie_size);
                 if (ma->ma_cookie == NULL)
                         GOTO(out_cookie, rc = -ENOMEM);
                 OBD_ALLOC(ma->ma_cookie, cookie_size);
                 if (ma->ma_cookie == NULL)
                         GOTO(out_cookie, rc = -ENOMEM);
@@ -5158,13 +5159,14 @@ static int mdt_mfd_cleanup(struct obd_export *exp)
                 /* Close any open files (which may also cause orphan unlinking). */
                 list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
                         list_del_init(&mfd->mfd_list);
                 /* Close any open files (which may also cause orphan unlinking). */
                 list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
                         list_del_init(&mfd->mfd_list);
-                        /* TODO: if we close the unlinked file,
-                         * we need to remove its objects from OST */
                         memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
                         ma->ma_lmm_size = lmm_size;
                         ma->ma_cookie_size = cookie_size;
                         memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
                         ma->ma_lmm_size = lmm_size;
                         ma->ma_cookie_size = cookie_size;
-                        ma->ma_need = MA_LOV | MA_COOKIE;
-                        ma->ma_valid = 0;
+                        ma->ma_need = 0;
+                        /* It is not for setattr, just tell MDD to send
+                         * DESTROY RPC to OSS if needed */
+                        ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
+                        ma->ma_valid = MA_FLAGS;
                         mdt_mfd_close(info, mfd);
                 }
                 info->mti_mdt = NULL;
                         mdt_mfd_close(info, mfd);
                 }
                 info->mti_mdt = NULL;
index c584859..0794d1e 100644 (file)
@@ -1099,6 +1099,16 @@ static void osd_object_write_unlock(const struct lu_env *env,
         up_write(&obj->oo_sem);
 }
 
         up_write(&obj->oo_sem);
 }
 
+static int osd_object_write_locked(const struct lu_env *env,
+                                   struct dt_object *dt)
+{
+        struct osd_object *obj = osd_dt_obj(dt);
+
+        LINVRNT(osd_invariant(obj));
+
+        return obj->oo_owner == env;
+}
+
 static int capa_is_sane(const struct lu_env *env,
                         struct osd_device *dev,
                         struct lustre_capa *capa,
 static int capa_is_sane(const struct lu_env *env,
                         struct osd_device *dev,
                         struct lustre_capa *capa,
@@ -2136,6 +2146,7 @@ static const struct dt_object_operations osd_obj_ops = {
         .do_write_lock   = osd_object_write_lock,
         .do_read_unlock  = osd_object_read_unlock,
         .do_write_unlock = osd_object_write_unlock,
         .do_write_lock   = osd_object_write_lock,
         .do_read_unlock  = osd_object_read_unlock,
         .do_write_unlock = osd_object_write_unlock,
+        .do_write_locked = osd_object_write_locked,
         .do_attr_get     = osd_attr_get,
         .do_attr_set     = osd_attr_set,
         .do_ah_init      = osd_ah_init,
         .do_attr_get     = osd_attr_get,
         .do_attr_set     = osd_attr_set,
         .do_ah_init      = osd_ah_init,
@@ -2163,6 +2174,7 @@ static const struct dt_object_operations osd_obj_ea_ops = {
         .do_write_lock   = osd_object_write_lock,
         .do_read_unlock  = osd_object_read_unlock,
         .do_write_unlock = osd_object_write_unlock,
         .do_write_lock   = osd_object_write_lock,
         .do_read_unlock  = osd_object_read_unlock,
         .do_write_unlock = osd_object_write_unlock,
+        .do_write_locked = osd_object_write_locked,
         .do_attr_get     = osd_attr_get,
         .do_attr_set     = osd_attr_set,
         .do_ah_init      = osd_ah_init,
         .do_attr_get     = osd_attr_get,
         .do_attr_set     = osd_attr_set,
         .do_ah_init      = osd_ah_init,