Whamcloud - gitweb
LU-5506 lfsck: skip orphan OST-object handling for failed OSTs
[fs/lustre-release.git] / lustre / osp / osp_object.c
index efd2a3d..b2b550a 100644 (file)
 
 #include "osp_internal.h"
 
+static inline __u32 osp_dev2node(struct osp_device *osp)
+{
+       return osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
+}
+
 static inline bool is_ost_obj(struct lu_object *lo)
 {
        return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
 }
 
 static void osp_object_assign_fid(const struct lu_env *env,
-                                struct osp_device *d, struct osp_object *o)
+                                 struct osp_device *d, struct osp_object *o)
 {
        struct osp_thread_info *osi = osp_env_info(env);
 
@@ -87,51 +92,52 @@ static int osp_oac_init(struct osp_object *obj)
 
 static struct osp_xattr_entry *
 osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
-                         const char *name, int namelen, bool unlink)
+                         const char *name, size_t namelen)
 {
        struct osp_xattr_entry *oxe;
 
        list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
                if (namelen == oxe->oxe_namelen &&
-                   strncmp(name, oxe->oxe_buf, namelen) == 0) {
-                       if (unlink)
-                               list_del_init(&oxe->oxe_list);
-                       else
-                               atomic_inc(&oxe->oxe_ref);
-
+                   strncmp(name, oxe->oxe_buf, namelen) == 0)
                        return oxe;
-               }
        }
 
        return NULL;
 }
 
 static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
-                                                 const char *name)
+                                                 const char *name, bool unlink)
 {
        struct osp_xattr_entry *oxe = NULL;
 
        spin_lock(&obj->opo_lock);
-       if (obj->opo_ooa != NULL)
+       if (obj->opo_ooa != NULL) {
                oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
-                                               strlen(name), false);
+                                               strlen(name));
+               if (oxe != NULL) {
+                       if (unlink)
+                               list_del_init(&oxe->oxe_list);
+                       else
+                               atomic_inc(&oxe->oxe_ref);
+               }
+       }
        spin_unlock(&obj->opo_lock);
 
        return oxe;
 }
 
 static struct osp_xattr_entry *
-osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len)
+osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
 {
        struct osp_object_attr *ooa     = obj->opo_ooa;
        struct osp_xattr_entry *oxe;
        struct osp_xattr_entry *tmp     = NULL;
-       int                     namelen = strlen(name);
-       int                     size    = sizeof(*oxe) + namelen + 1 + len;
+       size_t                  namelen = strlen(name);
+       size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
 
        LASSERT(ooa != NULL);
 
-       oxe = osp_oac_xattr_find(obj, name);
+       oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL)
                return oxe;
 
@@ -148,9 +154,11 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len)
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       tmp = osp_oac_xattr_find_locked(ooa, name, namelen, false);
+       tmp = osp_oac_xattr_find_locked(ooa, name, namelen);
        if (tmp == NULL)
                list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+       else
+               atomic_inc(&tmp->oxe_ref);
        spin_unlock(&obj->opo_lock);
 
        if (tmp != NULL) {
@@ -163,14 +171,12 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len)
 
 static struct osp_xattr_entry *
 osp_oac_xattr_replace(struct osp_object *obj,
-                     struct osp_xattr_entry **poxe, int len)
+                     struct osp_xattr_entry **poxe, size_t len)
 {
        struct osp_object_attr *ooa     = obj->opo_ooa;
-       struct osp_xattr_entry *old     = *poxe;
        struct osp_xattr_entry *oxe;
-       struct osp_xattr_entry *tmp     = NULL;
-       int                     namelen = old->oxe_namelen;
-       int                     size    = sizeof(*oxe) + namelen + 1 + len;
+       size_t                  namelen = (*poxe)->oxe_namelen;
+       size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
 
        LASSERT(ooa != NULL);
 
@@ -181,19 +187,19 @@ osp_oac_xattr_replace(struct osp_object *obj,
        INIT_LIST_HEAD(&oxe->oxe_list);
        oxe->oxe_buflen = size;
        oxe->oxe_namelen = namelen;
-       memcpy(oxe->oxe_buf, old->oxe_buf, namelen);
+       memcpy(oxe->oxe_buf, (*poxe)->oxe_buf, namelen);
        oxe->oxe_value = oxe->oxe_buf + namelen + 1;
        /* One ref is for the caller, the other is for the entry on the list. */
        atomic_set(&oxe->oxe_ref, 2);
 
        spin_lock(&obj->opo_lock);
-       tmp = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen, true);
+       *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen);
+       LASSERT(*poxe != NULL);
+
+       list_del_init(&(*poxe)->oxe_list);
        list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
        spin_unlock(&obj->opo_lock);
 
-       *poxe = tmp;
-       LASSERT(tmp != NULL);
-
        return oxe;
 }
 
@@ -281,7 +287,6 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt,
 {
        struct osp_object       *obj    = dt2osp_obj(dt);
        struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
-       struct dt_update_request *update;
        int                      rc     = 0;
 
        if (obj->opo_ooa == NULL) {
@@ -291,14 +296,9 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt,
        }
 
        mutex_lock(&osp->opd_async_requests_mutex);
-       update = osp_find_or_create_async_update_request(osp);
-       if (IS_ERR(update))
-               rc = PTR_ERR(update);
-       else
-               rc = osp_insert_async_update(env, update, OUT_ATTR_GET, obj, 0,
-                                            NULL, NULL,
-                                            &obj->opo_ooa->ooa_attr,
-                                            osp_attr_get_interpterer);
+       rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
+                                     &obj->opo_ooa->ooa_attr,
+                                     osp_attr_get_interpterer);
        mutex_unlock(&osp->opd_async_requests_mutex);
 
        return rc;
@@ -381,14 +381,13 @@ out:
        return rc;
 }
 
-static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
-                               const struct lu_attr *attr, struct thandle *th)
+static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_attr *attr, struct thandle *th)
 {
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
        struct lu_attr          *la;
        int                      rc = 0;
-
        ENTRY;
 
        /*
@@ -423,10 +422,9 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
-       if (o->opo_new) {
+       if (o->opo_new)
                /* no need in logging for new objects being created */
                RETURN(0);
-       }
 
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
@@ -444,6 +442,7 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0 || o->opo_ooa == NULL)
                RETURN(rc);
 
+       /* Update the OSP object attributes cache. */
        la = &o->opo_ooa->ooa_attr;
        spin_lock(&o->opo_lock);
        if (attr->la_valid & LA_UID) {
@@ -460,15 +459,62 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
+/**
+ * XXX: NOT prepare set_{attr,xattr} RPC for remote transaction.
+ *
+ * According to our current transaction/dt_object_lock framework (to make
+ * the cross-MDTs modification for DNE1 to be workable), the transaction
+ * sponsor will start the transaction firstly, then try to acquire related
+ * dt_object_lock if needed. Under such rules, if we want to prepare the
+ * set_{attr,xattr} RPC in the RPC declare phase, then related attr/xattr
+ * should be known without dt_object_lock. But such condition maybe not
+ * true for some remote transaction case. For example:
+ *
+ * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained
+ * the dt_object_lock on the target MDT-object, it cannot know whether
+ * the MDT-object has linkEA or not, neither invalid or not.
+ *
+ * Since the LFSCK thread cannot hold dt_object_lock before the (remote)
+ * transaction start (otherwise there will be some potential deadlock),
+ * it cannot prepare related RPC for repairing during the declare phase
+ * as other normal transactions do.
+ *
+ * To resolve the trouble, we will make OSP to prepare related RPC
+ * (set_attr/set_xattr/del_xattr) after remote transaction started,
+ * and trigger the remote updating (RPC sending) when trans_stop.
+ * Then the up layer users, such as LFSCK, can follow the general
+ * rule to handle trans_start/dt_object_lock for repairing linkEA
+ * inconsistency without distinguishing remote MDT-object.
+ *
+ * In fact, above solution for remote transaction should be the normal
+ * model without considering DNE1. The trouble brought by DNE1 will be
+ * resolved in DNE2. At that time, this patch can be removed.
+ */
+static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                               const struct lu_attr *attr, struct thandle *th)
+{
+       int rc = 0;
+
+       if (!is_only_remote_trans(th))
+               rc = __osp_attr_set(env, dt, attr, th);
+
+       return rc;
+}
+
 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                        const struct lu_attr *attr, struct thandle *th,
                        struct lustre_capa *capa)
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
-
        ENTRY;
 
+       if (is_only_remote_trans(th)) {
+               rc = __osp_attr_set(env, dt, attr, th);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
        /* we're interested in uid/gid changes only */
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
@@ -510,7 +556,7 @@ static int osp_xattr_get_interpterer(const struct lu_env *env,
        LASSERT(ooa != NULL);
 
        if (rc == 0) {
-               int len = sizeof(*oxe) + oxe->oxe_namelen + 1;
+               size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1;
 
                rc = object_update_result_data_get(reply, rbuf, index);
                if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) {
@@ -550,7 +596,6 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
 {
        struct osp_object       *obj     = dt2osp_obj(dt);
        struct osp_device       *osp     = lu2osp_dev(dt->do_lu.lo_dev);
-       struct dt_update_request *update;
        struct osp_xattr_entry  *oxe;
        int                      namelen = strlen(name);
        int                      rc      = 0;
@@ -573,33 +618,28 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
                return -ENOMEM;
 
        mutex_lock(&osp->opd_async_requests_mutex);
-       update = osp_find_or_create_async_update_request(osp);
-       if (IS_ERR(update)) {
-               rc = PTR_ERR(update);
+       rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1,
+                                     &namelen, &name, oxe,
+                                     osp_xattr_get_interpterer);
+       if (rc != 0) {
                mutex_unlock(&osp->opd_async_requests_mutex);
                osp_oac_xattr_put(oxe);
        } else {
-               rc = osp_insert_async_update(env, update, OUT_XATTR_GET, obj,
-                                            1, &namelen, &name, oxe,
-                                            osp_xattr_get_interpterer);
-               if (rc != 0) {
+               struct dt_update_request *update;
+
+               /* XXX: Currently, we trigger the batched async OUT
+                *      RPC via dt_declare_xattr_get(). It is not
+                *      perfect solution, but works well now.
+                *
+                *      We will improve it in the future. */
+               update = osp->opd_async_requests;
+               if (update != NULL && update->dur_req != NULL &&
+                   update->dur_req->ourq_count > 0) {
+                       osp->opd_async_requests = NULL;
                        mutex_unlock(&osp->opd_async_requests_mutex);
-                       osp_oac_xattr_put(oxe);
+                       rc = osp_unplug_async_request(env, osp, update);
                } else {
-                       /* XXX: Currently, we trigger the batched async OUT
-                        *      RPC via dt_declare_xattr_get(). It is not
-                        *      perfect solution, but works well now.
-                        *
-                        *      We will improve it in the future. */
-                       update = osp->opd_async_requests;
-                       if (update != NULL && update->dur_req != NULL &&
-                           update->dur_req->ourq_count > 0) {
-                               osp->opd_async_requests = NULL;
-                               mutex_unlock(&osp->opd_async_requests_mutex);
-                               rc = osp_unplug_async_update(env, osp, update);
-                       } else {
-                               mutex_unlock(&osp->opd_async_requests_mutex);
-                       }
+                       mutex_unlock(&osp->opd_async_requests_mutex);
                }
        }
 
@@ -626,10 +666,15 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        LASSERT(buf != NULL);
        LASSERT(name != NULL);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
+           osp->opd_index == cfs_fail_val &&
+           osp_dev2node(osp) == cfs_fail_val)
+               RETURN(-ENOTCONN);
+
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
-       oxe = osp_oac_xattr_find(obj, name);
+       oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL) {
                spin_lock(&obj->opo_lock);
                if (oxe->oxe_ready) {
@@ -778,18 +823,21 @@ out:
        return rc;
 }
 
-int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
-                         const struct lu_buf *buf, const char *name,
-                         int flag, struct thandle *th)
+static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                          const struct lu_buf *buf, const char *name,
+                          int flag, struct thandle *th)
 {
-       struct osp_object       *o       = dt2osp_obj(dt);
        struct dt_update_request *update;
-       struct lu_fid           *fid;
-       struct osp_xattr_entry  *oxe;
-       int                     sizes[3] = {strlen(name), buf->lb_len,
-                                           sizeof(int)};
-       char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
-       int                     rc;
+       struct lu_fid            *fid;
+       int                      sizes[3]       = { strlen(name),
+                                                   buf->lb_len,
+                                                   sizeof(int) };
+       char                     *bufs[3]       = { (char *)name,
+                                                   (char *)buf->lb_buf };
+       struct osp_xattr_entry   *oxe;
+       struct osp_object        *o             = dt2osp_obj(dt);
+       int                      rc;
+       ENTRY;
 
        LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
 
@@ -800,7 +848,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
                       PFID(lu_object_fid(&dt->do_lu)),
                       (int)PTR_ERR(update));
 
-               return PTR_ERR(update);
+               RETURN(PTR_ERR(update));
        }
 
        flag = cpu_to_le32(flag);
@@ -810,15 +858,15 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        rc = out_insert_update(env, update, OUT_XATTR_SET, fid,
                               ARRAY_SIZE(sizes), sizes, (const char **)bufs);
        if (rc != 0 || o->opo_ooa == NULL)
-               return rc;
+               RETURN(rc);
 
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
-               CWARN("%s: Fail to add xattr (%s) to cache for "DFID
-                     ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
-                     name, PFID(lu_object_fid(&dt->do_lu)), rc);
+               CWARN("%s: Fail to add xattr (%s) to cache for "DFID,
+                     dt->do_lu.lo_dev->ld_obd->obd_name,
+                     name, PFID(lu_object_fid(&dt->do_lu)));
 
-               return 0;
+               RETURN(0);
        }
 
        if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
@@ -829,14 +877,14 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
                osp_oac_xattr_put(oxe);
                oxe = tmp;
                if (tmp == NULL) {
-                       CWARN("%s: Fail to update xattr (%s) to cache for "DFID
-                             ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
-                             name, PFID(lu_object_fid(&dt->do_lu)), rc);
+                       CWARN("%s: Fail to update xattr (%s) to cache for "DFID,
+                             dt->do_lu.lo_dev->ld_obd->obd_name,
+                             name, PFID(lu_object_fid(&dt->do_lu)));
                        spin_lock(&o->opo_lock);
                        old->oxe_ready = 0;
                        spin_unlock(&o->opo_lock);
 
-                       return 0;
+                       RETURN(0);
                }
 
                /* Drop the ref for entry on list. */
@@ -851,26 +899,49 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        spin_unlock(&o->opo_lock);
        osp_oac_xattr_put(oxe);
 
-       return 0;
+       RETURN(0);
+}
+
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_buf *buf, const char *name,
+                         int flag, struct thandle *th)
+{
+       int rc = 0;
+
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (!is_only_remote_trans(th))
+               rc = __osp_xattr_set(env, dt, buf, name, flag, th);
+
+       return rc;
 }
 
 int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  const struct lu_buf *buf, const char *name, int fl,
                  struct thandle *th, struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       return 0;
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (is_only_remote_trans(th))
+               rc = __osp_xattr_set(env, dt, buf, name, fl, th);
+
+       return rc;
 }
 
-int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
-                         const char *name, struct thandle *th)
+static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                          const char *name, struct thandle *th)
 {
        struct dt_update_request *update;
        const struct lu_fid      *fid;
-       int                      size = strlen(name);
-       int                      rc;
+       struct osp_object        *o     = dt2osp_obj(dt);
+       struct osp_xattr_entry   *oxe;
+       int                       size  = strlen(name);
+       int                       rc;
 
        update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update))
@@ -880,6 +951,26 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
        rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size,
                               (const char **)&name);
+       if (rc != 0 || o->opo_ooa == NULL)
+               return rc;
+
+       oxe = osp_oac_xattr_find(o, name, true);
+       if (oxe != NULL)
+               /* Drop the ref for entry on list. */
+               osp_oac_xattr_put(oxe);
+
+       return 0;
+}
+
+int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                         const char *name, struct thandle *th)
+{
+       int rc = 0;
+
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (!is_only_remote_trans(th))
+               rc = __osp_xattr_del(env, dt, name, th);
 
        return rc;
 }
@@ -888,10 +979,17 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *th,
                  struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name,
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       return 0;
+       /* Please check the comment in osp_attr_set() for handling
+        * remote transaction. */
+       if (is_only_remote_trans(th))
+               rc = __osp_xattr_del(env, dt, name, th);
+
+       return rc;
 }
 
 static int osp_declare_object_create(const struct lu_env *env,
@@ -1249,8 +1347,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
        ii->ii_magic = IDX_INFO_MAGIC;
        ii->ii_count = npages * LU_PAGE_COUNT;
        ii->ii_hash_start = it->ooi_next;
-       ii->ii_attrs =
-               osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
+       ii->ii_attrs = osp_dev2node(osp);
 
        ptlrpc_at_set_req_timeout(req);
 
@@ -1469,10 +1566,10 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
  * \retval     -ve: on error
  */
 int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
-               __u64 hash)
+                      __u64 hash)
 {
        struct osp_it   *it     = (struct osp_it *)di;
-       int                      rc;
+       int              rc;
 
        it->ooi_next = hash;
        rc = osp_orphan_it_next(env, (struct dt_it *)di);
@@ -1591,7 +1688,7 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o)
                        count = atomic_read(&oxe->oxe_ref);
                        LASSERTF(count == 1,
                                 "Still has %d users on the xattr entry %.*s\n",
-                                count - 1, oxe->oxe_namelen, oxe->oxe_buf);
+                                count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
 
                        OBD_FREE(oxe, oxe->oxe_buflen);
                }