Whamcloud - gitweb
LU-5506 lfsck: skip orphan OST-object handling for failed OSTs
[fs/lustre-release.git] / lustre / osp / osp_object.c
index 966d751..b2b550a 100644 (file)
 
 #include "osp_internal.h"
 
+static inline __u32 osp_dev2node(struct osp_device *osp)
+{
+       return osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
+}
+
 static inline bool is_ost_obj(struct lu_object *lo)
 {
        return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
 }
 
 static void osp_object_assign_fid(const struct lu_env *env,
-                                struct osp_device *d, struct osp_object *o)
+                                 struct osp_device *d, struct osp_object *o)
 {
        struct osp_thread_info *osi = osp_env_info(env);
 
@@ -87,34 +92,35 @@ static int osp_oac_init(struct osp_object *obj)
 
 static struct osp_xattr_entry *
 osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
-                         const char *name, size_t namelen, bool unlink)
+                         const char *name, size_t namelen)
 {
        struct osp_xattr_entry *oxe;
 
        list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
                if (namelen == oxe->oxe_namelen &&
-                   strncmp(name, oxe->oxe_buf, namelen) == 0) {
-                       if (unlink)
-                               list_del_init(&oxe->oxe_list);
-                       else
-                               atomic_inc(&oxe->oxe_ref);
-
+                   strncmp(name, oxe->oxe_buf, namelen) == 0)
                        return oxe;
-               }
        }
 
        return NULL;
 }
 
 static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
-                                                 const char *name)
+                                                 const char *name, bool unlink)
 {
        struct osp_xattr_entry *oxe = NULL;
 
        spin_lock(&obj->opo_lock);
-       if (obj->opo_ooa != NULL)
+       if (obj->opo_ooa != NULL) {
                oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
-                                               strlen(name), false);
+                                               strlen(name));
+               if (oxe != NULL) {
+                       if (unlink)
+                               list_del_init(&oxe->oxe_list);
+                       else
+                               atomic_inc(&oxe->oxe_ref);
+               }
+       }
        spin_unlock(&obj->opo_lock);
 
        return oxe;
@@ -131,7 +137,7 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
 
        LASSERT(ooa != NULL);
 
-       oxe = osp_oac_xattr_find(obj, name);
+       oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL)
                return oxe;
 
@@ -148,9 +154,11 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       tmp = osp_oac_xattr_find_locked(ooa, name, namelen, false);
+       tmp = osp_oac_xattr_find_locked(ooa, name, namelen);
        if (tmp == NULL)
                list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+       else
+               atomic_inc(&tmp->oxe_ref);
        spin_unlock(&obj->opo_lock);
 
        if (tmp != NULL) {
@@ -166,10 +174,8 @@ osp_oac_xattr_replace(struct osp_object *obj,
                      struct osp_xattr_entry **poxe, size_t len)
 {
        struct osp_object_attr *ooa     = obj->opo_ooa;
-       struct osp_xattr_entry *old     = *poxe;
        struct osp_xattr_entry *oxe;
-       struct osp_xattr_entry *tmp     = NULL;
-       size_t                  namelen = old->oxe_namelen;
+       size_t                  namelen = (*poxe)->oxe_namelen;
        size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
 
        LASSERT(ooa != NULL);
@@ -181,19 +187,19 @@ osp_oac_xattr_replace(struct osp_object *obj,
        INIT_LIST_HEAD(&oxe->oxe_list);
        oxe->oxe_buflen = size;
        oxe->oxe_namelen = namelen;
-       memcpy(oxe->oxe_buf, old->oxe_buf, namelen);
+       memcpy(oxe->oxe_buf, (*poxe)->oxe_buf, namelen);
        oxe->oxe_value = oxe->oxe_buf + namelen + 1;
        /* One ref is for the caller, the other is for the entry on the list. */
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       tmp = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen, true);
+       *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen);
+       LASSERT(*poxe != NULL);
+
+       list_del_init(&(*poxe)->oxe_list);
        list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
        spin_unlock(&obj->opo_lock);
 
-       *poxe = tmp;
-       LASSERT(tmp != NULL);
-
        return oxe;
 }
 
@@ -375,14 +381,13 @@ out:
        return rc;
 }
 
-static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
-                               const struct lu_attr *attr, struct thandle *th)
+static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_attr *attr, struct thandle *th)
 {
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
        struct lu_attr          *la;
        int                      rc = 0;
-
        ENTRY;
 
        /*
@@ -417,10 +422,9 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
-       if (o->opo_new) {
+       if (o->opo_new)
                /* no need in logging for new objects being created */
                RETURN(0);
-       }
 
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
@@ -438,6 +442,7 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0 || o->opo_ooa == NULL)
                RETURN(rc);
 
+       /* Update the OSP object attributes cache. */
        la = &o->opo_ooa->ooa_attr;
        spin_lock(&o->opo_lock);
        if (attr->la_valid & LA_UID) {
@@ -454,15 +459,62 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
+/**
+ * XXX: NOT prepare set_{attr,xattr} RPC for remote transaction.
+ *
+ * According to our current transaction/dt_object_lock framework (to make
+ * the cross-MDTs modification for DNE1 to be workable), the transaction
+ * sponsor will start the transaction firstly, then try to acquire related
+ * dt_object_lock if needed. Under such rules, if we want to prepare the
+ * set_{attr,xattr} RPC in the RPC declare phase, then related attr/xattr
+ * should be known without dt_object_lock. But such condition maybe not
+ * true for some remote transaction case. For example:
+ *
+ * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained
+ * the dt_object_lock on the target MDT-object, it cannot know whether
+ * the MDT-object has linkEA or not, neither invalid or not.
+ *
+ * Since the LFSCK thread cannot hold dt_object_lock before the (remote)
+ * transaction start (otherwise there will be some potential deadlock),
+ * it cannot prepare related RPC for repairing during the declare phase
+ * as other normal transactions do.
+ *
+ * To resolve the trouble, we will make OSP to prepare related RPC
+ * (set_attr/set_xattr/del_xattr) after remote transaction started,
+ * and trigger the remote updating (RPC sending) when trans_stop.
+ * Then the up layer users, such as LFSCK, can follow the general
+ * rule to handle trans_start/dt_object_lock for repairing linkEA
+ * inconsistency without distinguishing remote MDT-object.
+ *
+ * In fact, above solution for remote transaction should be the normal
+ * model without considering DNE1. The trouble brought by DNE1 will be
+ * resolved in DNE2. At that time, this patch can be removed.
+ */
+static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                               const struct lu_attr *attr, struct thandle *th)
+{
+       int rc = 0;
+
+       if (!is_only_remote_trans(th))
+               rc = __osp_attr_set(env, dt, attr, th);
+
+       return rc;
+}
+
 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                        const struct lu_attr *attr, struct thandle *th,
                        struct lustre_capa *capa)
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
-
        ENTRY;
 
+       if (is_only_remote_trans(th)) {
+               rc = __osp_attr_set(env, dt, attr, th);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
        /* we're interested in uid/gid changes only */
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
@@ -614,10 +666,15 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        LASSERT(buf != NULL);
        LASSERT(name != NULL);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
+           osp->opd_index == cfs_fail_val &&
+           osp_dev2node(osp) == cfs_fail_val)
+               RETURN(-ENOTCONN);
+
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
-       oxe = osp_oac_xattr_find(obj, name);
+       oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL) {
                spin_lock(&obj->opo_lock);
                if (oxe->oxe_ready) {
@@ -766,18 +823,21 @@ out:
        return rc;
 }
 
-int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
-                         const struct lu_buf *buf, const char *name,
-                         int flag, struct thandle *th)
+static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                          const struct lu_buf *buf, const char *name,
+                          int flag, struct thandle *th)
 {
-       struct osp_object       *o       = dt2osp_obj(dt);
        struct dt_update_request *update;
-       struct lu_fid           *fid;
-       struct osp_xattr_entry  *oxe;
-       int                     sizes[3] = {strlen(name), buf->lb_len,
-                                           sizeof(int)};
-       char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
-       int                     rc;
+       struct lu_fid            *fid;
+       int                      sizes[3]       = { strlen(name),
+                                                   buf->lb_len,
+                                                   sizeof(int) };
+       char                     *bufs[3]       = { (char *)name,
+                                                   (char *)buf->lb_buf };
+       struct osp_xattr_entry   *oxe;
+       struct osp_object        *o             = dt2osp_obj(dt);
+       int                      rc;
+       ENTRY;
 
        LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
 
@@ -788,7 +848,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
                       PFID(lu_object_fid(&dt->do_lu)),
                       (int)PTR_ERR(update));
 
-               return PTR_ERR(update);
+               RETURN(PTR_ERR(update));
        }
 
        flag = cpu_to_le32(flag);
@@ -798,15 +858,15 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        rc = out_insert_update(env, update, OUT_XATTR_SET, fid,
                               ARRAY_SIZE(sizes), sizes, (const char **)bufs);
        if (rc != 0 || o->opo_ooa == NULL)
-               return rc;
+               RETURN(rc);
 
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
-               CWARN("%s: Fail to add xattr (%s) to cache for "DFID
-                     ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
-                     name, PFID(lu_object_fid(&dt->do_lu)), rc);
+               CWARN("%s: Fail to add xattr (%s) to cache for "DFID,
+                     dt->do_lu.lo_dev->ld_obd->obd_name,
+                     name, PFID(lu_object_fid(&dt->do_lu)));
 
-               return 0;
+               RETURN(0);
        }
 
        if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
@@ -817,14 +877,14 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
                osp_oac_xattr_put(oxe);
                oxe = tmp;
                if (tmp == NULL) {
-                       CWARN("%s: Fail to update xattr (%s) to cache for "DFID
-                             ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
-                             name, PFID(lu_object_fid(&dt->do_lu)), rc);
+                       CWARN("%s: Fail to update xattr (%s) to cache for "DFID,
+                             dt->do_lu.lo_dev->ld_obd->obd_name,
+                             name, PFID(lu_object_fid(&dt->do_lu)));
                        spin_lock(&o->opo_lock);
                        old->oxe_ready = 0;
                        spin_unlock(&o->opo_lock);
 
-                       return 0;
+                       RETURN(0);
                }
 
                /* Drop the ref for entry on list. */
@@ -839,26 +899,49 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        spin_unlock(&o->opo_lock);
        osp_oac_xattr_put(oxe);
 
-       return 0;
+       RETURN(0);
+}
+
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_buf *buf, const char *name,
+                         int flag, struct thandle *th)
+{
+       int rc = 0;
+
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (!is_only_remote_trans(th))
+               rc = __osp_xattr_set(env, dt, buf, name, flag, th);
+
+       return rc;
 }
 
 int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  const struct lu_buf *buf, const char *name, int fl,
                  struct thandle *th, struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       return 0;
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (is_only_remote_trans(th))
+               rc = __osp_xattr_set(env, dt, buf, name, fl, th);
+
+       return rc;
 }
 
-int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
-                         const char *name, struct thandle *th)
+static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                          const char *name, struct thandle *th)
 {
        struct dt_update_request *update;
        const struct lu_fid      *fid;
-       int                      size = strlen(name);
-       int                      rc;
+       struct osp_object        *o     = dt2osp_obj(dt);
+       struct osp_xattr_entry   *oxe;
+       int                       size  = strlen(name);
+       int                       rc;
 
        update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update))
@@ -868,6 +951,26 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
        rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size,
                               (const char **)&name);
+       if (rc != 0 || o->opo_ooa == NULL)
+               return rc;
+
+       oxe = osp_oac_xattr_find(o, name, true);
+       if (oxe != NULL)
+               /* Drop the ref for entry on list. */
+               osp_oac_xattr_put(oxe);
+
+       return 0;
+}
+
+int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                         const char *name, struct thandle *th)
+{
+       int rc = 0;
+
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (!is_only_remote_trans(th))
+               rc = __osp_xattr_del(env, dt, name, th);
 
        return rc;
 }
@@ -876,10 +979,17 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *th,
                  struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       return 0;
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (is_only_remote_trans(th))
+               rc = __osp_xattr_del(env, dt, name, th);
+
+       return rc;
 }
 
 static int osp_declare_object_create(const struct lu_env *env,
@@ -1237,8 +1347,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
        ii->ii_magic = IDX_INFO_MAGIC;
        ii->ii_count = npages * LU_PAGE_COUNT;
        ii->ii_hash_start = it->ooi_next;
-       ii->ii_attrs =
-               osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
+       ii->ii_attrs = osp_dev2node(osp);
 
        ptlrpc_at_set_req_timeout(req);
 
@@ -1457,10 +1566,10 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
  * \retval     -ve: on error
  */
 int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
-               __u64 hash)
+                      __u64 hash)
 {
        struct osp_it   *it     = (struct osp_it *)di;
-       int                      rc;
+       int              rc;
 
        it->ooi_next = hash;
        rc = osp_orphan_it_next(env, (struct dt_it *)di);