#include "cmm_internal.h"
#include "mdc_internal.h"
-/* XXX: fix later this hack. It exists because OSD produces fids with like this:
- seq = ROOT_SEQ + 1, etc. */
-static int cmm_special_fid(const struct lu_fid *fid)
-{
- struct lu_range *space = (struct lu_range *)&LUSTRE_SEQ_SPACE_RANGE;
- return !range_within(space, fid_seq(fid));
-}
-
static int cmm_fld_lookup(struct cmm_device *cm,
const struct lu_fid *fid, mdsno_t *mds)
{
LASSERT(fid_is_sane(fid));
- /* XXX: is this correct? We need this to prevent FLD lookups while CMM
- * did not initialized yet all MDCs. */
- if (cmm_special_fid(fid))
- *mds = 0;
- else {
- rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), mds);
- if (rc) {
- CERROR("can't find mds by seq "LPU64", rc %d\n",
- fid_seq(fid), rc);
- RETURN(rc);
- }
+ rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), mds);
+ if (rc) {
+ CERROR("can't find mds by seq "LPU64", rc %d\n",
+ fid_seq(fid), rc);
+ RETURN(rc);
}
if (*mds >= cm->cmm_tgt_count) {
- CERROR("Got invalid mdsno: %u (max: %u)\n",
+ CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
*mds, cm->cmm_tgt_count);
rc = -EINVAL;
} else {
- CDEBUG(D_INFO, "CMM: got MDS %u for sequence: "LPU64"\n",
+ CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "LPU64"\n",
*mds, fid_seq(fid));
}
{
int rc;
ENTRY;
- rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+ rc = mo_object_create(ctx, md_object_next(mo), attr);
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mo_attr_get(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+ rc = mo_attr_get(ctx, md_object_next(mo), attr);
RETURN(rc);
}
static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
- struct lu_attr *attr)
+ const struct lu_attr *attr)
{
int rc;
ENTRY;
- rc = mo_attr_set(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+ rc = mo_attr_set(ctx, md_object_next(mo), attr);
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mo_xattr_get(ctx, cmm2child_obj(md2cmm_obj(mo)),
+ rc = mo_xattr_get(ctx, md_object_next(mo),
buf, buflen, name);
RETURN(rc);
}
static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
- void *buf, int buflen, const char *name)
+ const void *buf, int buflen, const char *name)
{
int rc;
ENTRY;
- rc = mo_xattr_set(ctx, cmm2child_obj(md2cmm_obj(mo)),
+ rc = mo_xattr_set(ctx, md_object_next(mo),
buf, buflen, name);
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ rc = mo_ref_add(ctx, md_object_next(mo));
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ rc = mo_ref_del(ctx, md_object_next(mo));
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mo_open(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ rc = mo_open(ctx, md_object_next(mo));
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mo_close(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ rc = mo_close(ctx, md_object_next(mo));
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mdo_lookup(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name, lf);
+ rc = mdo_lookup(ctx, md_object_next(mo_p), name, lf);
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mdo_create(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name,
- cmm2child_obj(md2cmm_obj(mo_c)), attr);
+ rc = mdo_create(ctx, md_object_next(mo_p), name,
+ md_object_next(mo_c), attr);
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mdo_link(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
- cmm2child_obj(md2cmm_obj(mo_s)), name);
+ rc = mdo_link(ctx, md_object_next(mo_p),
+ md_object_next(mo_s), name);
RETURN(rc);
}
{
int rc;
ENTRY;
- rc = mdo_unlink(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
- cmm2child_obj(md2cmm_obj(mo_c)), name);
+ rc = mdo_unlink(ctx, md_object_next(mo_p),
+ md_object_next(mo_c), name);
RETURN(rc);
}
+/* rename is split to local/remote by location of new parent dir */
static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
struct md_object *mo_pn, const struct lu_fid *lf,
const char *s_name, struct md_object *mo_t,
ENTRY;
if (mo_t && !cmm_is_local_obj(md2cmm_obj(mo_t))) {
- /* remote object */
- rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
+ /* mo_t is remote object and there is RPC to unlink it */
+ rc = mo_ref_del(ctx, md_object_next(mo_t));
if (rc)
RETURN(rc);
mo_t = NULL;
}
-
- rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
- cmm2child_obj(md2cmm_obj(mo_pn)), lf, s_name,
- cmm2child_obj(md2cmm_obj(mo_t)), t_name);
+ /* local rename, mo_t can be NULL */
+ rc = mdo_rename(ctx, md_object_next(mo_po),
+ md_object_next(mo_pn), lf, s_name,
+ md_object_next(mo_t), t_name);
RETURN(rc);
}
int rc;
ENTRY;
- rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
- cmm2child_obj(md2cmm_obj(mo_t)), lf, name);
+ rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
+ md_object_next(mo_t), lf, name);
RETURN(rc);
}
}
static int cmr_attr_set(const struct lu_context *ctx, struct md_object *mo,
- struct lu_attr *attr)
+ const struct lu_attr *attr)
{
RETURN(-EFAULT);
}
}
static int cmr_xattr_set(const struct lu_context *ctx, struct md_object *mo,
- void *buf, int buflen, const char *name)
+ const void *buf, int buflen, const char *name)
{
RETURN(-EFAULT);
}
RETURN(-EREMOTE);
}
+/*
+ * All methods below are cross-ref by nature. They consist of remote call and
+ * local operation. Due to future rollback functionality there are several
+ * limitations for such methods:
+ * 1) remote call should be done at first to do epoch negotiation between all
+ * MDS involved and to avoid the RPC inside transaction.
+ * 2) only one RPC can be sent - also due to epoch negotiation.
+ * For more details see rollback HLD/DLD.
+ *
+ */
static int cmr_create(const struct lu_context *ctx,
struct md_object *mo_p, const char *name,
struct md_object *mo_c, struct lu_attr *attr)
//XXX: make sure that MDT checks name isn't exist
/* remote object creation and local name insert */
- rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo_c)), attr);
+ rc = mo_object_create(ctx, md_object_next(mo_c), attr);
if (rc == 0) {
- rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ rc = mdo_name_insert(ctx, md_object_next(mo_p),
name, lu_object_fid(&mo_c->mo_lu));
}
//XXX: make sure that MDT checks name isn't exist
- rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
+ rc = mo_ref_add(ctx, md_object_next(mo_s));
if (rc == 0) {
- rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ rc = mdo_name_insert(ctx, md_object_next(mo_p),
name, lu_object_fid(&mo_s->mo_lu));
}
int rc;
ENTRY;
- rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+ rc = mo_ref_del(ctx, md_object_next(mo_c));
if (rc == 0) {
- rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ rc = mdo_name_remove(ctx, md_object_next(mo_p),
name);
}
* lookup and process this further */
LASSERT(mo_t == NULL);
- rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_pn)),
+ rc = mdo_rename_tgt(ctx, md_object_next(mo_pn),
NULL/* mo_t */, lf, t_name);
/* only old name is removed localy */
if (rc == 0)
- rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+ rc = mdo_name_remove(ctx, md_object_next(mo_po),
s_name);
RETURN(rc);
}
+/* part of cross-ref rename(). Used to insert new name in new parent
+ * and unlink target with same name if it exists */
static int cmr_rename_tgt(const struct lu_context *ctx,
struct md_object *mo_p, struct md_object *mo_t,
const struct lu_fid *lf, const char *name)
int rc;
ENTRY;
/* target object is remote one */
- rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
+ rc = mo_ref_del(ctx, md_object_next(mo_t));
/* continue locally with name handling only */
if (rc == 0)
- rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
NULL, lf, name);
RETURN(rc);
}