Whamcloud - gitweb
LU-5508 osp: RPC adjustment for remote transaction 82/11382/12
authorFan Yong <fan.yong@intel.com>
Sat, 26 Jul 2014 22:15:42 +0000 (06:15 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 15 Sep 2014 18:16:37 +0000 (18:16 +0000)
1) For remote transaction, the set_attr/set_xattr RPC should not be
   prepared in declare phase. According to our current transaction/
   dt_object_lock framework, the transaction sponsor will start the
   transaction firstly, then try to acquire related dt_object_lock
   if needed. That is a general rule, and the LFSCK needs to follow
   such rule when repair inconsistent linkEA, in spite of local or
   remote MDT-object.

   For linkEA repairing case, before the LFSCK thread obtained the
   dt_object_lock on the target MDT-object, it cannot know whether
   the MDT-object has linkEA or not, neither invalid or not.

   Since the LFSCK cannot hold dt_object_lock before the (remote)
   transaction start (otherwise there will be potential deadlock),
   it cannot prepare related RPC for repairing during the declare
   phase as other normal transactions do.

   To resolve the trouble, we will make OSP to prepare related RPC
   (set_attr/set_xattr/del_xattr) after remote transaction started,
   and trigger the remote updating (RPC sending) when trans_stop.
   Then the up layer users, such as LFSCK, can follow the general
   rule to handle trans_start/dt_object_lock for repairing linkEA
   inconsistency without distinguishing remote MDT-object.

2) Some adjustment for OSP object attributes cache maintainig to make
   the logic more clear and reasonable.
2.1) Update the cached attribute in osp_attr_set(), but not in the
     osp_declare_attr_set().
2.2) Update the cached extended attribute in osp_xattr_set(), but not
     in the osp_declare_xattr_set().
2.3) Drop the cached extended attribute in osp_xattr_del().

3) Typo fixing and code cleanup.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I43c88a8fd3b184c91a4b3cbd4104e35f9915ee24
Reviewed-on: http://review.whamcloud.com/11382
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c

index 2b924cd..237e8ae 100644 (file)
@@ -194,6 +194,7 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
         * if creation reaches here, it means the object has been created
         * successfully */
        dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
         * if creation reaches here, it means the object has been created
         * successfully */
        dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
+       dt2osp_obj(dt)->opo_non_exist = 0;
 
        return 0;
 }
 
        return 0;
 }
index 966d751..6776c87 100644 (file)
@@ -51,7 +51,7 @@ static inline bool is_ost_obj(struct lu_object *lo)
 }
 
 static void osp_object_assign_fid(const struct lu_env *env,
 }
 
 static void osp_object_assign_fid(const struct lu_env *env,
-                                struct osp_device *d, struct osp_object *o)
+                                 struct osp_device *d, struct osp_object *o)
 {
        struct osp_thread_info *osi = osp_env_info(env);
 
 {
        struct osp_thread_info *osi = osp_env_info(env);
 
@@ -87,34 +87,35 @@ static int osp_oac_init(struct osp_object *obj)
 
 static struct osp_xattr_entry *
 osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
 
 static struct osp_xattr_entry *
 osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
-                         const char *name, size_t namelen, bool unlink)
+                         const char *name, size_t namelen)
 {
        struct osp_xattr_entry *oxe;
 
        list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
                if (namelen == oxe->oxe_namelen &&
 {
        struct osp_xattr_entry *oxe;
 
        list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
                if (namelen == oxe->oxe_namelen &&
-                   strncmp(name, oxe->oxe_buf, namelen) == 0) {
-                       if (unlink)
-                               list_del_init(&oxe->oxe_list);
-                       else
-                               atomic_inc(&oxe->oxe_ref);
-
+                   strncmp(name, oxe->oxe_buf, namelen) == 0)
                        return oxe;
                        return oxe;
-               }
        }
 
        return NULL;
 }
 
 static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
        }
 
        return NULL;
 }
 
 static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
-                                                 const char *name)
+                                                 const char *name, bool unlink)
 {
        struct osp_xattr_entry *oxe = NULL;
 
        spin_lock(&obj->opo_lock);
 {
        struct osp_xattr_entry *oxe = NULL;
 
        spin_lock(&obj->opo_lock);
-       if (obj->opo_ooa != NULL)
+       if (obj->opo_ooa != NULL) {
                oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
                oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
-                                               strlen(name), false);
+                                               strlen(name));
+               if (oxe != NULL) {
+                       if (unlink)
+                               list_del_init(&oxe->oxe_list);
+                       else
+                               atomic_inc(&oxe->oxe_ref);
+               }
+       }
        spin_unlock(&obj->opo_lock);
 
        return oxe;
        spin_unlock(&obj->opo_lock);
 
        return oxe;
@@ -131,7 +132,7 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
 
        LASSERT(ooa != NULL);
 
 
        LASSERT(ooa != NULL);
 
-       oxe = osp_oac_xattr_find(obj, name);
+       oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL)
                return oxe;
 
        if (oxe != NULL)
                return oxe;
 
@@ -148,9 +149,11 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       tmp = osp_oac_xattr_find_locked(ooa, name, namelen, false);
+       tmp = osp_oac_xattr_find_locked(ooa, name, namelen);
        if (tmp == NULL)
                list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
        if (tmp == NULL)
                list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+       else
+               atomic_inc(&tmp->oxe_ref);
        spin_unlock(&obj->opo_lock);
 
        if (tmp != NULL) {
        spin_unlock(&obj->opo_lock);
 
        if (tmp != NULL) {
@@ -166,10 +169,8 @@ osp_oac_xattr_replace(struct osp_object *obj,
                      struct osp_xattr_entry **poxe, size_t len)
 {
        struct osp_object_attr *ooa     = obj->opo_ooa;
                      struct osp_xattr_entry **poxe, size_t len)
 {
        struct osp_object_attr *ooa     = obj->opo_ooa;
-       struct osp_xattr_entry *old     = *poxe;
        struct osp_xattr_entry *oxe;
        struct osp_xattr_entry *oxe;
-       struct osp_xattr_entry *tmp     = NULL;
-       size_t                  namelen = old->oxe_namelen;
+       size_t                  namelen = (*poxe)->oxe_namelen;
        size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
 
        LASSERT(ooa != NULL);
        size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
 
        LASSERT(ooa != NULL);
@@ -181,19 +182,19 @@ osp_oac_xattr_replace(struct osp_object *obj,
        INIT_LIST_HEAD(&oxe->oxe_list);
        oxe->oxe_buflen = size;
        oxe->oxe_namelen = namelen;
        INIT_LIST_HEAD(&oxe->oxe_list);
        oxe->oxe_buflen = size;
        oxe->oxe_namelen = namelen;
-       memcpy(oxe->oxe_buf, old->oxe_buf, namelen);
+       memcpy(oxe->oxe_buf, (*poxe)->oxe_buf, namelen);
        oxe->oxe_value = oxe->oxe_buf + namelen + 1;
        /* One ref is for the caller, the other is for the entry on the list. */
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
        oxe->oxe_value = oxe->oxe_buf + namelen + 1;
        /* One ref is for the caller, the other is for the entry on the list. */
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       tmp = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen, true);
+       *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen);
+       LASSERT(*poxe != NULL);
+
+       list_del_init(&(*poxe)->oxe_list);
        list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
        spin_unlock(&obj->opo_lock);
 
        list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
        spin_unlock(&obj->opo_lock);
 
-       *poxe = tmp;
-       LASSERT(tmp != NULL);
-
        return oxe;
 }
 
        return oxe;
 }
 
@@ -375,14 +376,13 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
-                               const struct lu_attr *attr, struct thandle *th)
+static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_attr *attr, struct thandle *th)
 {
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
        struct lu_attr          *la;
        int                      rc = 0;
 {
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
        struct lu_attr          *la;
        int                      rc = 0;
-
        ENTRY;
 
        /*
        ENTRY;
 
        /*
@@ -417,10 +417,9 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
                        RETURN(rc);
        }
 
-       if (o->opo_new) {
+       if (o->opo_new)
                /* no need in logging for new objects being created */
                RETURN(0);
                /* no need in logging for new objects being created */
                RETURN(0);
-       }
 
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
 
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
@@ -438,6 +437,7 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0 || o->opo_ooa == NULL)
                RETURN(rc);
 
        if (rc != 0 || o->opo_ooa == NULL)
                RETURN(rc);
 
+       /* Update the OSP object attributes cache. */
        la = &o->opo_ooa->ooa_attr;
        spin_lock(&o->opo_lock);
        if (attr->la_valid & LA_UID) {
        la = &o->opo_ooa->ooa_attr;
        spin_lock(&o->opo_lock);
        if (attr->la_valid & LA_UID) {
@@ -454,15 +454,62 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
+/**
+ * XXX: NOT prepare set_{attr,xattr} RPC for remote transaction.
+ *
+ * According to our current transaction/dt_object_lock framework (to make
+ * the cross-MDTs modification for DNE1 to be workable), the transaction
+ * sponsor will start the transaction firstly, then try to acquire related
+ * dt_object_lock if needed. Under such rules, if we want to prepare the
+ * set_{attr,xattr} RPC in the RPC declare phase, then related attr/xattr
+ * should be known without dt_object_lock. But such condition maybe not
+ * true for some remote transaction case. For example:
+ *
+ * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained
+ * the dt_object_lock on the target MDT-object, it cannot know whether
+ * the MDT-object has linkEA or not, neither invalid or not.
+ *
+ * Since the LFSCK thread cannot hold dt_object_lock before the (remote)
+ * transaction start (otherwise there will be some potential deadlock),
+ * it cannot prepare related RPC for repairing during the declare phase
+ * as other normal transactions do.
+ *
+ * To resolve the trouble, we will make OSP to prepare related RPC
+ * (set_attr/set_xattr/del_xattr) after remote transaction started,
+ * and trigger the remote updating (RPC sending) when trans_stop.
+ * Then the up layer users, such as LFSCK, can follow the general
+ * rule to handle trans_start/dt_object_lock for repairing linkEA
+ * inconsistency without distinguishing remote MDT-object.
+ *
+ * In fact, above solution for remote transaction should be the normal
+ * model without considering DNE1. The trouble brought by DNE1 will be
+ * resolved in DNE2. At that time, this patch can be removed.
+ */
+static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                               const struct lu_attr *attr, struct thandle *th)
+{
+       int rc = 0;
+
+       if (!is_only_remote_trans(th))
+               rc = __osp_attr_set(env, dt, attr, th);
+
+       return rc;
+}
+
 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                        const struct lu_attr *attr, struct thandle *th,
                        struct lustre_capa *capa)
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                        const struct lu_attr *attr, struct thandle *th,
                        struct lustre_capa *capa)
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
-
        ENTRY;
 
        ENTRY;
 
+       if (is_only_remote_trans(th)) {
+               rc = __osp_attr_set(env, dt, attr, th);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
        /* we're interested in uid/gid changes only */
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
        /* we're interested in uid/gid changes only */
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
@@ -617,7 +664,7 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
-       oxe = osp_oac_xattr_find(obj, name);
+       oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL) {
                spin_lock(&obj->opo_lock);
                if (oxe->oxe_ready) {
        if (oxe != NULL) {
                spin_lock(&obj->opo_lock);
                if (oxe->oxe_ready) {
@@ -766,18 +813,21 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
-                         const struct lu_buf *buf, const char *name,
-                         int flag, struct thandle *th)
+static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                          const struct lu_buf *buf, const char *name,
+                          int flag, struct thandle *th)
 {
 {
-       struct osp_object       *o       = dt2osp_obj(dt);
        struct dt_update_request *update;
        struct dt_update_request *update;
-       struct lu_fid           *fid;
-       struct osp_xattr_entry  *oxe;
-       int                     sizes[3] = {strlen(name), buf->lb_len,
-                                           sizeof(int)};
-       char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
-       int                     rc;
+       struct lu_fid            *fid;
+       int                      sizes[3]       = { strlen(name),
+                                                   buf->lb_len,
+                                                   sizeof(int) };
+       char                     *bufs[3]       = { (char *)name,
+                                                   (char *)buf->lb_buf };
+       struct osp_xattr_entry   *oxe;
+       struct osp_object        *o             = dt2osp_obj(dt);
+       int                      rc;
+       ENTRY;
 
        LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
 
 
        LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
 
@@ -788,7 +838,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
                       PFID(lu_object_fid(&dt->do_lu)),
                       (int)PTR_ERR(update));
 
                       PFID(lu_object_fid(&dt->do_lu)),
                       (int)PTR_ERR(update));
 
-               return PTR_ERR(update);
+               RETURN(PTR_ERR(update));
        }
 
        flag = cpu_to_le32(flag);
        }
 
        flag = cpu_to_le32(flag);
@@ -798,15 +848,15 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        rc = out_insert_update(env, update, OUT_XATTR_SET, fid,
                               ARRAY_SIZE(sizes), sizes, (const char **)bufs);
        if (rc != 0 || o->opo_ooa == NULL)
        rc = out_insert_update(env, update, OUT_XATTR_SET, fid,
                               ARRAY_SIZE(sizes), sizes, (const char **)bufs);
        if (rc != 0 || o->opo_ooa == NULL)
-               return rc;
+               RETURN(rc);
 
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
 
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
-               CWARN("%s: Fail to add xattr (%s) to cache for "DFID
-                     ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
-                     name, PFID(lu_object_fid(&dt->do_lu)), rc);
+               CWARN("%s: Fail to add xattr (%s) to cache for "DFID,
+                     dt->do_lu.lo_dev->ld_obd->obd_name,
+                     name, PFID(lu_object_fid(&dt->do_lu)));
 
 
-               return 0;
+               RETURN(0);
        }
 
        if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
        }
 
        if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
@@ -817,14 +867,14 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
                osp_oac_xattr_put(oxe);
                oxe = tmp;
                if (tmp == NULL) {
                osp_oac_xattr_put(oxe);
                oxe = tmp;
                if (tmp == NULL) {
-                       CWARN("%s: Fail to update xattr (%s) to cache for "DFID
-                             ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
-                             name, PFID(lu_object_fid(&dt->do_lu)), rc);
+                       CWARN("%s: Fail to update xattr (%s) to cache for "DFID,
+                             dt->do_lu.lo_dev->ld_obd->obd_name,
+                             name, PFID(lu_object_fid(&dt->do_lu)));
                        spin_lock(&o->opo_lock);
                        old->oxe_ready = 0;
                        spin_unlock(&o->opo_lock);
 
                        spin_lock(&o->opo_lock);
                        old->oxe_ready = 0;
                        spin_unlock(&o->opo_lock);
 
-                       return 0;
+                       RETURN(0);
                }
 
                /* Drop the ref for entry on list. */
                }
 
                /* Drop the ref for entry on list. */
@@ -839,26 +889,49 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        spin_unlock(&o->opo_lock);
        osp_oac_xattr_put(oxe);
 
        spin_unlock(&o->opo_lock);
        osp_oac_xattr_put(oxe);
 
-       return 0;
+       RETURN(0);
+}
+
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_buf *buf, const char *name,
+                         int flag, struct thandle *th)
+{
+       int rc = 0;
+
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (!is_only_remote_trans(th))
+               rc = __osp_xattr_set(env, dt, buf, name, flag, th);
+
+       return rc;
 }
 
 int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  const struct lu_buf *buf, const char *name, int fl,
                  struct thandle *th, struct lustre_capa *capa)
 {
 }
 
 int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  const struct lu_buf *buf, const char *name, int fl,
                  struct thandle *th, struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
        CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       return 0;
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (is_only_remote_trans(th))
+               rc = __osp_xattr_set(env, dt, buf, name, fl, th);
+
+       return rc;
 }
 
 }
 
-int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
-                         const char *name, struct thandle *th)
+static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                          const char *name, struct thandle *th)
 {
        struct dt_update_request *update;
        const struct lu_fid      *fid;
 {
        struct dt_update_request *update;
        const struct lu_fid      *fid;
-       int                      size = strlen(name);
-       int                      rc;
+       struct osp_object        *o     = dt2osp_obj(dt);
+       struct osp_xattr_entry   *oxe;
+       int                       size  = strlen(name);
+       int                       rc;
 
        update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update))
 
        update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update))
@@ -868,6 +941,26 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
        rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size,
                               (const char **)&name);
 
        rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size,
                               (const char **)&name);
+       if (rc != 0 || o->opo_ooa == NULL)
+               return rc;
+
+       oxe = osp_oac_xattr_find(o, name, true);
+       if (oxe != NULL)
+               /* Drop the ref for entry on list. */
+               osp_oac_xattr_put(oxe);
+
+       return 0;
+}
+
+int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                         const char *name, struct thandle *th)
+{
+       int rc = 0;
+
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (!is_only_remote_trans(th))
+               rc = __osp_xattr_del(env, dt, name, th);
 
        return rc;
 }
 
        return rc;
 }
@@ -876,10 +969,17 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *th,
                  struct lustre_capa *capa)
 {
                  const char *name, struct thandle *th,
                  struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
        CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       return 0;
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (is_only_remote_trans(th))
+               rc = __osp_xattr_del(env, dt, name, th);
+
+       return rc;
 }
 
 static int osp_declare_object_create(const struct lu_env *env,
 }
 
 static int osp_declare_object_create(const struct lu_env *env,
@@ -1457,10 +1557,10 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
  * \retval     -ve: on error
  */
 int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
  * \retval     -ve: on error
  */
 int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
-               __u64 hash)
+                      __u64 hash)
 {
        struct osp_it   *it     = (struct osp_it *)di;
 {
        struct osp_it   *it     = (struct osp_it *)di;
-       int                      rc;
+       int              rc;
 
        it->ooi_next = hash;
        rc = osp_orphan_it_next(env, (struct dt_it *)di);
 
        it->ooi_next = hash;
        rc = osp_orphan_it_next(env, (struct dt_it *)di);