#include "osp_internal.h"
+static inline __u32 osp_dev2node(struct osp_device *osp)
+{
+ return osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
+}
+
static inline bool is_ost_obj(struct lu_object *lo)
{
return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
}
static void osp_object_assign_fid(const struct lu_env *env,
- struct osp_device *d, struct osp_object *o)
+ struct osp_device *d, struct osp_object *o)
{
struct osp_thread_info *osi = osp_env_info(env);
static struct osp_xattr_entry *
osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
- const char *name, int namelen, bool unlink)
+ const char *name, size_t namelen)
{
struct osp_xattr_entry *oxe;
list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
if (namelen == oxe->oxe_namelen &&
- strncmp(name, oxe->oxe_buf, namelen) == 0) {
- if (unlink)
- list_del_init(&oxe->oxe_list);
- else
- atomic_inc(&oxe->oxe_ref);
-
+ strncmp(name, oxe->oxe_buf, namelen) == 0)
return oxe;
- }
}
return NULL;
}
static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
- const char *name)
+ const char *name, bool unlink)
{
struct osp_xattr_entry *oxe = NULL;
spin_lock(&obj->opo_lock);
- if (obj->opo_ooa != NULL)
+ if (obj->opo_ooa != NULL) {
oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
- strlen(name), false);
+ strlen(name));
+ if (oxe != NULL) {
+ if (unlink)
+ list_del_init(&oxe->oxe_list);
+ else
+ atomic_inc(&oxe->oxe_ref);
+ }
+ }
spin_unlock(&obj->opo_lock);
return oxe;
}
static struct osp_xattr_entry *
-osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len)
+osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
{
struct osp_object_attr *ooa = obj->opo_ooa;
struct osp_xattr_entry *oxe;
struct osp_xattr_entry *tmp = NULL;
- int namelen = strlen(name);
- int size = sizeof(*oxe) + namelen + 1 + len;
+ size_t namelen = strlen(name);
+ size_t size = sizeof(*oxe) + namelen + 1 + len;
LASSERT(ooa != NULL);
- oxe = osp_oac_xattr_find(obj, name);
+ oxe = osp_oac_xattr_find(obj, name, false);
if (oxe != NULL)
return oxe;
atomic_set(&oxe->oxe_ref, 2);
spin_lock(&obj->opo_lock);
- tmp = osp_oac_xattr_find_locked(ooa, name, namelen, false);
+ tmp = osp_oac_xattr_find_locked(ooa, name, namelen);
if (tmp == NULL)
list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+ else
+ atomic_inc(&tmp->oxe_ref);
spin_unlock(&obj->opo_lock);
if (tmp != NULL) {
static struct osp_xattr_entry *
osp_oac_xattr_replace(struct osp_object *obj,
- struct osp_xattr_entry **poxe, int len)
+ struct osp_xattr_entry **poxe, size_t len)
{
struct osp_object_attr *ooa = obj->opo_ooa;
- struct osp_xattr_entry *old = *poxe;
struct osp_xattr_entry *oxe;
- struct osp_xattr_entry *tmp = NULL;
- int namelen = old->oxe_namelen;
- int size = sizeof(*oxe) + namelen + 1 + len;
+ size_t namelen = (*poxe)->oxe_namelen;
+ size_t size = sizeof(*oxe) + namelen + 1 + len;
LASSERT(ooa != NULL);
INIT_LIST_HEAD(&oxe->oxe_list);
oxe->oxe_buflen = size;
oxe->oxe_namelen = namelen;
- memcpy(oxe->oxe_buf, old->oxe_buf, namelen);
+ memcpy(oxe->oxe_buf, (*poxe)->oxe_buf, namelen);
oxe->oxe_value = oxe->oxe_buf + namelen + 1;
/* One ref is for the caller, the other is for the entry on the list. */
atomic_set(&oxe->oxe_ref, 2);
spin_lock(&obj->opo_lock);
- tmp = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen, true);
+ *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen);
+ LASSERT(*poxe != NULL);
+
+ list_del_init(&(*poxe)->oxe_list);
list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
spin_unlock(&obj->opo_lock);
- *poxe = tmp;
- LASSERT(tmp != NULL);
-
return oxe;
}
return rc;
}
-static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
- const struct lu_attr *attr, struct thandle *th)
+static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *attr, struct thandle *th)
{
struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev);
struct osp_object *o = dt2osp_obj(dt);
struct lu_attr *la;
int rc = 0;
-
ENTRY;
/*
RETURN(rc);
}
- if (o->opo_new) {
+ if (o->opo_new)
/* no need in logging for new objects being created */
RETURN(0);
- }
if (!(attr->la_valid & (LA_UID | LA_GID)))
RETURN(0);
if (rc != 0 || o->opo_ooa == NULL)
RETURN(rc);
+ /* Update the OSP object attributes cache. */
la = &o->opo_ooa->ooa_attr;
spin_lock(&o->opo_lock);
if (attr->la_valid & LA_UID) {
RETURN(0);
}
+/**
+ * XXX: NOT prepare set_{attr,xattr} RPC for remote transaction.
+ *
+ * According to our current transaction/dt_object_lock framework (to make
+ * the cross-MDTs modification for DNE1 to be workable), the transaction
+ * sponsor will start the transaction firstly, then try to acquire related
+ * dt_object_lock if needed. Under such rules, if we want to prepare the
+ * set_{attr,xattr} RPC in the RPC declare phase, then related attr/xattr
+ * should be known without dt_object_lock. But such condition maybe not
+ * true for some remote transaction case. For example:
+ *
+ * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained
+ * the dt_object_lock on the target MDT-object, it cannot know whether
+ * the MDT-object has linkEA or not, neither invalid or not.
+ *
+ * Since the LFSCK thread cannot hold dt_object_lock before the (remote)
+ * transaction start (otherwise there will be some potential deadlock),
+ * it cannot prepare related RPC for repairing during the declare phase
+ * as other normal transactions do.
+ *
+ * To resolve the trouble, we will make OSP to prepare related RPC
+ * (set_attr/set_xattr/del_xattr) after remote transaction started,
+ * and trigger the remote updating (RPC sending) when trans_stop.
+ * Then the up layer users, such as LFSCK, can follow the general
+ * rule to handle trans_start/dt_object_lock for repairing linkEA
+ * inconsistency without distinguishing remote MDT-object.
+ *
+ * In fact, above solution for remote transaction should be the normal
+ * model without considering DNE1. The trouble brought by DNE1 will be
+ * resolved in DNE2. At that time, this patch can be removed.
+ */
+static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *attr, struct thandle *th)
+{
+ int rc = 0;
+
+ if (!is_only_remote_trans(th))
+ rc = __osp_attr_set(env, dt, attr, th);
+
+ return rc;
+}
+
static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_attr *attr, struct thandle *th,
struct lustre_capa *capa)
{
struct osp_object *o = dt2osp_obj(dt);
int rc = 0;
-
ENTRY;
+ if (is_only_remote_trans(th)) {
+ rc = __osp_attr_set(env, dt, attr, th);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
/* we're interested in uid/gid changes only */
if (!(attr->la_valid & (LA_UID | LA_GID)))
RETURN(0);
LASSERT(ooa != NULL);
if (rc == 0) {
- int len = sizeof(*oxe) + oxe->oxe_namelen + 1;
+ size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1;
rc = object_update_result_data_get(reply, rbuf, index);
if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) {
LASSERT(buf != NULL);
LASSERT(name != NULL);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
+ osp->opd_index == cfs_fail_val &&
+ osp_dev2node(osp) == cfs_fail_val)
+ RETURN(-ENOTCONN);
+
if (unlikely(obj->opo_non_exist))
RETURN(-ENOENT);
- oxe = osp_oac_xattr_find(obj, name);
+ oxe = osp_oac_xattr_find(obj, name, false);
if (oxe != NULL) {
spin_lock(&obj->opo_lock);
if (oxe->oxe_ready) {
return rc;
}
-int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
- const struct lu_buf *buf, const char *name,
- int flag, struct thandle *th)
+static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int flag, struct thandle *th)
{
- struct osp_object *o = dt2osp_obj(dt);
struct dt_update_request *update;
- struct lu_fid *fid;
- struct osp_xattr_entry *oxe;
- int sizes[3] = {strlen(name), buf->lb_len,
- sizeof(int)};
- char *bufs[3] = {(char *)name, (char *)buf->lb_buf };
- int rc;
+ struct lu_fid *fid;
+ int sizes[3] = { strlen(name),
+ buf->lb_len,
+ sizeof(int) };
+ char *bufs[3] = { (char *)name,
+ (char *)buf->lb_buf };
+ struct osp_xattr_entry *oxe;
+ struct osp_object *o = dt2osp_obj(dt);
+ int rc;
+ ENTRY;
LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
PFID(lu_object_fid(&dt->do_lu)),
(int)PTR_ERR(update));
- return PTR_ERR(update);
+ RETURN(PTR_ERR(update));
}
flag = cpu_to_le32(flag);
rc = out_insert_update(env, update, OUT_XATTR_SET, fid,
ARRAY_SIZE(sizes), sizes, (const char **)bufs);
if (rc != 0 || o->opo_ooa == NULL)
- return rc;
+ RETURN(rc);
oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
if (oxe == NULL) {
- CWARN("%s: Fail to add xattr (%s) to cache for "DFID
- ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
- name, PFID(lu_object_fid(&dt->do_lu)), rc);
+ CWARN("%s: Fail to add xattr (%s) to cache for "DFID,
+ dt->do_lu.lo_dev->ld_obd->obd_name,
+ name, PFID(lu_object_fid(&dt->do_lu)));
- return 0;
+ RETURN(0);
}
if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
osp_oac_xattr_put(oxe);
oxe = tmp;
if (tmp == NULL) {
- CWARN("%s: Fail to update xattr (%s) to cache for "DFID
- ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
- name, PFID(lu_object_fid(&dt->do_lu)), rc);
+ CWARN("%s: Fail to update xattr (%s) to cache for "DFID,
+ dt->do_lu.lo_dev->ld_obd->obd_name,
+ name, PFID(lu_object_fid(&dt->do_lu)));
spin_lock(&o->opo_lock);
old->oxe_ready = 0;
spin_unlock(&o->opo_lock);
- return 0;
+ RETURN(0);
}
/* Drop the ref for entry on list. */
spin_unlock(&o->opo_lock);
osp_oac_xattr_put(oxe);
- return 0;
+ RETURN(0);
+}
+
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int flag, struct thandle *th)
+{
+ int rc = 0;
+
+ /* Please check the comment in osp_attr_set() for handling
+ * remote transaction. */
+ if (!is_only_remote_trans(th))
+ rc = __osp_xattr_set(env, dt, buf, name, flag, th);
+
+ return rc;
}
int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, const char *name, int fl,
struct thandle *th, struct lustre_capa *capa)
{
+ int rc = 0;
+
CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
PFID(&dt->do_lu.lo_header->loh_fid));
- return 0;
+ /* Please check the comment in osp_attr_set() for handling
+ * remote transaction. */
+ if (is_only_remote_trans(th))
+ rc = __osp_xattr_set(env, dt, buf, name, fl, th);
+
+ return rc;
}
-int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
- const char *name, struct thandle *th)
+static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
+ const char *name, struct thandle *th)
{
struct dt_update_request *update;
const struct lu_fid *fid;
- int size = strlen(name);
- int rc;
+ struct osp_object *o = dt2osp_obj(dt);
+ struct osp_xattr_entry *oxe;
+ int size = strlen(name);
+ int rc;
update = out_find_create_update_loc(th, dt);
if (IS_ERR(update))
rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size,
(const char **)&name);
+ if (rc != 0 || o->opo_ooa == NULL)
+ return rc;
+
+ oxe = osp_oac_xattr_find(o, name, true);
+ if (oxe != NULL)
+ /* Drop the ref for entry on list. */
+ osp_oac_xattr_put(oxe);
+
+ return 0;
+}
+
+int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
+ const char *name, struct thandle *th)
+{
+ int rc = 0;
+
+ /* Please check the comment in osp_attr_set() for handling
+ * remote transaction. */
+ if (!is_only_remote_trans(th))
+ rc = __osp_xattr_del(env, dt, name, th);
return rc;
}
const char *name, struct thandle *th,
struct lustre_capa *capa)
{
+ int rc = 0;
+
CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name,
PFID(&dt->do_lu.lo_header->loh_fid));
- return 0;
+ /* Please check the comment in osp_attr_set() for handling
+ * remote transaction. */
+ if (is_only_remote_trans(th))
+ rc = __osp_xattr_del(env, dt, name, th);
+
+ return rc;
}
static int osp_declare_object_create(const struct lu_env *env,
ii->ii_magic = IDX_INFO_MAGIC;
ii->ii_count = npages * LU_PAGE_COUNT;
ii->ii_hash_start = it->ooi_next;
- ii->ii_attrs =
- osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
+ ii->ii_attrs = osp_dev2node(osp);
ptlrpc_at_set_req_timeout(req);
* \retval -ve: on error
*/
int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
- __u64 hash)
+ __u64 hash)
{
struct osp_it *it = (struct osp_it *)di;
- int rc;
+ int rc;
it->ooi_next = hash;
rc = osp_orphan_it_next(env, (struct dt_it *)di);
count = atomic_read(&oxe->oxe_ref);
LASSERTF(count == 1,
"Still has %d users on the xattr entry %.*s\n",
- count - 1, oxe->oxe_namelen, oxe->oxe_buf);
+ count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
OBD_FREE(oxe, oxe->oxe_buflen);
}