X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fosp%2Fosp_object.c;h=b2b550adb5bacecb5a75127061327e84fff5cf92;hb=e7554aac8fce48e1c84c67571f209db01f4f81fa;hp=efd2a3ddbcb23031214c8e6db4def39f1b92b7ef;hpb=60e07b972114df24105a3a1bfa7365892f72a4a7;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index efd2a3d..b2b550a 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -45,13 +45,18 @@ #include "osp_internal.h" +static inline __u32 osp_dev2node(struct osp_device *osp) +{ + return osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; +} + static inline bool is_ost_obj(struct lu_object *lo) { return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt; } static void osp_object_assign_fid(const struct lu_env *env, - struct osp_device *d, struct osp_object *o) + struct osp_device *d, struct osp_object *o) { struct osp_thread_info *osi = osp_env_info(env); @@ -87,51 +92,52 @@ static int osp_oac_init(struct osp_object *obj) static struct osp_xattr_entry * osp_oac_xattr_find_locked(struct osp_object_attr *ooa, - const char *name, int namelen, bool unlink) + const char *name, size_t namelen) { struct osp_xattr_entry *oxe; list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) { if (namelen == oxe->oxe_namelen && - strncmp(name, oxe->oxe_buf, namelen) == 0) { - if (unlink) - list_del_init(&oxe->oxe_list); - else - atomic_inc(&oxe->oxe_ref); - + strncmp(name, oxe->oxe_buf, namelen) == 0) return oxe; - } } return NULL; } static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj, - const char *name) + const char *name, bool unlink) { struct osp_xattr_entry *oxe = NULL; spin_lock(&obj->opo_lock); - if (obj->opo_ooa != NULL) + if (obj->opo_ooa != NULL) { oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name, - strlen(name), false); + strlen(name)); + if (oxe != NULL) { + if (unlink) + list_del_init(&oxe->oxe_list); + else + atomic_inc(&oxe->oxe_ref); + } + } spin_unlock(&obj->opo_lock); return oxe; } static struct osp_xattr_entry * -osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len) +osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) { struct osp_object_attr *ooa = obj->opo_ooa; struct osp_xattr_entry *oxe; struct osp_xattr_entry *tmp = NULL; - int namelen = strlen(name); - int size = sizeof(*oxe) + namelen + 1 + len; + size_t namelen = strlen(name); + size_t size = sizeof(*oxe) + namelen + 1 + len; LASSERT(ooa != NULL); - oxe = osp_oac_xattr_find(obj, name); + oxe = osp_oac_xattr_find(obj, name, false); if (oxe != NULL) return oxe; @@ -148,9 +154,11 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len) atomic_set(&oxe->oxe_ref, 2); spin_lock(&obj->opo_lock); - tmp = osp_oac_xattr_find_locked(ooa, name, namelen, false); + tmp = osp_oac_xattr_find_locked(ooa, name, namelen); if (tmp == NULL) list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list); + else + atomic_inc(&tmp->oxe_ref); spin_unlock(&obj->opo_lock); if (tmp != NULL) { @@ -163,14 +171,12 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len) static struct osp_xattr_entry * osp_oac_xattr_replace(struct osp_object *obj, - struct osp_xattr_entry **poxe, int len) + struct osp_xattr_entry **poxe, size_t len) { struct osp_object_attr *ooa = obj->opo_ooa; - struct osp_xattr_entry *old = *poxe; struct osp_xattr_entry *oxe; - struct osp_xattr_entry *tmp = NULL; - int namelen = old->oxe_namelen; - int size = sizeof(*oxe) + namelen + 1 + len; + size_t namelen = (*poxe)->oxe_namelen; + size_t size = sizeof(*oxe) + namelen + 1 + len; LASSERT(ooa != NULL); @@ -181,19 +187,19 @@ osp_oac_xattr_replace(struct osp_object *obj, INIT_LIST_HEAD(&oxe->oxe_list); oxe->oxe_buflen = size; oxe->oxe_namelen = namelen; - memcpy(oxe->oxe_buf, old->oxe_buf, namelen); + memcpy(oxe->oxe_buf, (*poxe)->oxe_buf, namelen); oxe->oxe_value = oxe->oxe_buf + namelen + 1; /* One ref is for the caller, the other is for the entry on the list. */ atomic_set(&oxe->oxe_ref, 2); spin_lock(&obj->opo_lock); - tmp = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen, true); + *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen); + LASSERT(*poxe != NULL); + + list_del_init(&(*poxe)->oxe_list); list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list); spin_unlock(&obj->opo_lock); - *poxe = tmp; - LASSERT(tmp != NULL); - return oxe; } @@ -281,7 +287,6 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, { struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct dt_update_request *update; int rc = 0; if (obj->opo_ooa == NULL) { @@ -291,14 +296,9 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, } mutex_lock(&osp->opd_async_requests_mutex); - update = osp_find_or_create_async_update_request(osp); - if (IS_ERR(update)) - rc = PTR_ERR(update); - else - rc = osp_insert_async_update(env, update, OUT_ATTR_GET, obj, 0, - NULL, NULL, - &obj->opo_ooa->ooa_attr, - osp_attr_get_interpterer); + rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL, + &obj->opo_ooa->ooa_attr, + osp_attr_get_interpterer); mutex_unlock(&osp->opd_async_requests_mutex); return rc; @@ -381,14 +381,13 @@ out: return rc; } -static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th) +static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev); struct osp_object *o = dt2osp_obj(dt); struct lu_attr *la; int rc = 0; - ENTRY; /* @@ -423,10 +422,9 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } - if (o->opo_new) { + if (o->opo_new) /* no need in logging for new objects being created */ RETURN(0); - } if (!(attr->la_valid & (LA_UID | LA_GID))) RETURN(0); @@ -444,6 +442,7 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, if (rc != 0 || o->opo_ooa == NULL) RETURN(rc); + /* Update the OSP object attributes cache. */ la = &o->opo_ooa->ooa_attr; spin_lock(&o->opo_lock); if (attr->la_valid & LA_UID) { @@ -460,15 +459,62 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(0); } +/** + * XXX: NOT prepare set_{attr,xattr} RPC for remote transaction. + * + * According to our current transaction/dt_object_lock framework (to make + * the cross-MDTs modification for DNE1 to be workable), the transaction + * sponsor will start the transaction firstly, then try to acquire related + * dt_object_lock if needed. Under such rules, if we want to prepare the + * set_{attr,xattr} RPC in the RPC declare phase, then related attr/xattr + * should be known without dt_object_lock. But such condition maybe not + * true for some remote transaction case. For example: + * + * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained + * the dt_object_lock on the target MDT-object, it cannot know whether + * the MDT-object has linkEA or not, neither invalid or not. + * + * Since the LFSCK thread cannot hold dt_object_lock before the (remote) + * transaction start (otherwise there will be some potential deadlock), + * it cannot prepare related RPC for repairing during the declare phase + * as other normal transactions do. + * + * To resolve the trouble, we will make OSP to prepare related RPC + * (set_attr/set_xattr/del_xattr) after remote transaction started, + * and trigger the remote updating (RPC sending) when trans_stop. + * Then the up layer users, such as LFSCK, can follow the general + * rule to handle trans_start/dt_object_lock for repairing linkEA + * inconsistency without distinguishing remote MDT-object. + * + * In fact, above solution for remote transaction should be the normal + * model without considering DNE1. The trouble brought by DNE1 will be + * resolved in DNE2. At that time, this patch can be removed. + */ +static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) + rc = __osp_attr_set(env, dt, attr, th); + + return rc; +} + static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th, struct lustre_capa *capa) { struct osp_object *o = dt2osp_obj(dt); int rc = 0; - ENTRY; + if (is_only_remote_trans(th)) { + rc = __osp_attr_set(env, dt, attr, th); + if (rc != 0) + RETURN(rc); + } + /* we're interested in uid/gid changes only */ if (!(attr->la_valid & (LA_UID | LA_GID))) RETURN(0); @@ -510,7 +556,7 @@ static int osp_xattr_get_interpterer(const struct lu_env *env, LASSERT(ooa != NULL); if (rc == 0) { - int len = sizeof(*oxe) + oxe->oxe_namelen + 1; + size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1; rc = object_update_result_data_get(reply, rbuf, index); if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) { @@ -550,7 +596,6 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, { struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct dt_update_request *update; struct osp_xattr_entry *oxe; int namelen = strlen(name); int rc = 0; @@ -573,33 +618,28 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, return -ENOMEM; mutex_lock(&osp->opd_async_requests_mutex); - update = osp_find_or_create_async_update_request(osp); - if (IS_ERR(update)) { - rc = PTR_ERR(update); + rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1, + &namelen, &name, oxe, + osp_xattr_get_interpterer); + if (rc != 0) { mutex_unlock(&osp->opd_async_requests_mutex); osp_oac_xattr_put(oxe); } else { - rc = osp_insert_async_update(env, update, OUT_XATTR_GET, obj, - 1, &namelen, &name, oxe, - osp_xattr_get_interpterer); - if (rc != 0) { + struct dt_update_request *update; + + /* XXX: Currently, we trigger the batched async OUT + * RPC via dt_declare_xattr_get(). It is not + * perfect solution, but works well now. + * + * We will improve it in the future. */ + update = osp->opd_async_requests; + if (update != NULL && update->dur_req != NULL && + update->dur_req->ourq_count > 0) { + osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); - osp_oac_xattr_put(oxe); + rc = osp_unplug_async_request(env, osp, update); } else { - /* XXX: Currently, we trigger the batched async OUT - * RPC via dt_declare_xattr_get(). It is not - * perfect solution, but works well now. - * - * We will improve it in the future. */ - update = osp->opd_async_requests; - if (update != NULL && update->dur_req != NULL && - update->dur_req->ourq_count > 0) { - osp->opd_async_requests = NULL; - mutex_unlock(&osp->opd_async_requests_mutex); - rc = osp_unplug_async_update(env, osp, update); - } else { - mutex_unlock(&osp->opd_async_requests_mutex); - } + mutex_unlock(&osp->opd_async_requests_mutex); } } @@ -626,10 +666,15 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, LASSERT(buf != NULL); LASSERT(name != NULL); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) && + osp->opd_index == cfs_fail_val && + osp_dev2node(osp) == cfs_fail_val) + RETURN(-ENOTCONN); + if (unlikely(obj->opo_non_exist)) RETURN(-ENOENT); - oxe = osp_oac_xattr_find(obj, name); + oxe = osp_oac_xattr_find(obj, name, false); if (oxe != NULL) { spin_lock(&obj->opo_lock); if (oxe->oxe_ready) { @@ -778,18 +823,21 @@ out: return rc; } -int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, const char *name, - int flag, struct thandle *th) +static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, + int flag, struct thandle *th) { - struct osp_object *o = dt2osp_obj(dt); struct dt_update_request *update; - struct lu_fid *fid; - struct osp_xattr_entry *oxe; - int sizes[3] = {strlen(name), buf->lb_len, - sizeof(int)}; - char *bufs[3] = {(char *)name, (char *)buf->lb_buf }; - int rc; + struct lu_fid *fid; + int sizes[3] = { strlen(name), + buf->lb_len, + sizeof(int) }; + char *bufs[3] = { (char *)name, + (char *)buf->lb_buf }; + struct osp_xattr_entry *oxe; + struct osp_object *o = dt2osp_obj(dt); + int rc; + ENTRY; LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL); @@ -800,7 +848,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, PFID(lu_object_fid(&dt->do_lu)), (int)PTR_ERR(update)); - return PTR_ERR(update); + RETURN(PTR_ERR(update)); } flag = cpu_to_le32(flag); @@ -810,15 +858,15 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, rc = out_insert_update(env, update, OUT_XATTR_SET, fid, ARRAY_SIZE(sizes), sizes, (const char **)bufs); if (rc != 0 || o->opo_ooa == NULL) - return rc; + RETURN(rc); oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len); if (oxe == NULL) { - CWARN("%s: Fail to add xattr (%s) to cache for "DFID - ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, - name, PFID(lu_object_fid(&dt->do_lu)), rc); + CWARN("%s: Fail to add xattr (%s) to cache for "DFID, + dt->do_lu.lo_dev->ld_obd->obd_name, + name, PFID(lu_object_fid(&dt->do_lu))); - return 0; + RETURN(0); } if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) { @@ -829,14 +877,14 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, osp_oac_xattr_put(oxe); oxe = tmp; if (tmp == NULL) { - CWARN("%s: Fail to update xattr (%s) to cache for "DFID - ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, - name, PFID(lu_object_fid(&dt->do_lu)), rc); + CWARN("%s: Fail to update xattr (%s) to cache for "DFID, + dt->do_lu.lo_dev->ld_obd->obd_name, + name, PFID(lu_object_fid(&dt->do_lu))); spin_lock(&o->opo_lock); old->oxe_ready = 0; spin_unlock(&o->opo_lock); - return 0; + RETURN(0); } /* Drop the ref for entry on list. */ @@ -851,26 +899,49 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, spin_unlock(&o->opo_lock); osp_oac_xattr_put(oxe); - return 0; + RETURN(0); +} + +int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, + int flag, struct thandle *th) +{ + int rc = 0; + + /* Please check the comment in osp_attr_set() for handling + * remote transaction. */ + if (!is_only_remote_trans(th)) + rc = __osp_xattr_set(env, dt, buf, name, flag, th); + + return rc; } int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th, struct lustre_capa *capa) { + int rc = 0; + CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name, PFID(&dt->do_lu.lo_header->loh_fid)); - return 0; + /* Please check the comment in osp_attr_set() for handling + * remote transaction. */ + if (is_only_remote_trans(th)) + rc = __osp_xattr_set(env, dt, buf, name, fl, th); + + return rc; } -int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th) +static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) { struct dt_update_request *update; const struct lu_fid *fid; - int size = strlen(name); - int rc; + struct osp_object *o = dt2osp_obj(dt); + struct osp_xattr_entry *oxe; + int size = strlen(name); + int rc; update = out_find_create_update_loc(th, dt); if (IS_ERR(update)) @@ -880,6 +951,26 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size, (const char **)&name); + if (rc != 0 || o->opo_ooa == NULL) + return rc; + + oxe = osp_oac_xattr_find(o, name, true); + if (oxe != NULL) + /* Drop the ref for entry on list. */ + osp_oac_xattr_put(oxe); + + return 0; +} + +int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) +{ + int rc = 0; + + /* Please check the comment in osp_attr_set() for handling + * remote transaction. */ + if (!is_only_remote_trans(th)) + rc = __osp_xattr_del(env, dt, name, th); return rc; } @@ -888,10 +979,17 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th, struct lustre_capa *capa) { + int rc = 0; + CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name, PFID(&dt->do_lu.lo_header->loh_fid)); - return 0; + /* Please check the comment in osp_attr_set() for handling + * remote transaction. */ + if (is_only_remote_trans(th)) + rc = __osp_xattr_del(env, dt, name, th); + + return rc; } static int osp_declare_object_create(const struct lu_env *env, @@ -1249,8 +1347,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it) ii->ii_magic = IDX_INFO_MAGIC; ii->ii_count = npages * LU_PAGE_COUNT; ii->ii_hash_start = it->ooi_next; - ii->ii_attrs = - osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; + ii->ii_attrs = osp_dev2node(osp); ptlrpc_at_set_req_timeout(req); @@ -1469,10 +1566,10 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) * \retval -ve: on error */ int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di, - __u64 hash) + __u64 hash) { struct osp_it *it = (struct osp_it *)di; - int rc; + int rc; it->ooi_next = hash; rc = osp_orphan_it_next(env, (struct dt_it *)di); @@ -1591,7 +1688,7 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o) count = atomic_read(&oxe->oxe_ref); LASSERTF(count == 1, "Still has %d users on the xattr entry %.*s\n", - count - 1, oxe->oxe_namelen, oxe->oxe_buf); + count-1, (int)oxe->oxe_namelen, oxe->oxe_buf); OBD_FREE(oxe, oxe->oxe_buflen); }