return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
}
-int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
+static int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
struct lu_fid *fid)
{
struct cmm_device *cmm_dev = md2cmm_dev(md);
cmm_dev->cmm_child, fid);
}
-int cmm_config(const struct lu_context *ctxt, struct md_device *md,
+static int cmm_config(const struct lu_context *ctxt, struct md_device *md,
const char *name, void *buf, int size, int mode)
{
struct cmm_device *cmm_dev = md2cmm_dev(md);
RETURN(rc);
}
-int cmm_statfs(const struct lu_context *ctxt, struct md_device *md,
+static int cmm_statfs(const struct lu_context *ctxt, struct md_device *md,
struct kstatfs *sfs) {
struct cmm_device *cmm_dev = md2cmm_dev(md);
int rc;
extern struct lu_device_type mdc_device_type;
+/* --- cmm_lu_operations --- */
/* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
static int cmm_add_mdc(const struct lu_context *ctx,
struct cmm_device * cm, struct lustre_cfg *cfg)
{
struct lu_device_type *ldt = &mdc_device_type;
struct lu_device *ld;
+ struct mdc_device *mc;
+#ifdef CMM_CODE
+ struct mdc_device *tmp;
+ __u32 mdc_num;
+#endif
int rc;
ENTRY;
- /*TODO check this MDC exists already */
-
+#ifdef CMM_CODE
+ /* find out that there is no such mdc */
+ LASSERT(lustre_cfg_string(cfg, 2));
+ mdc_num = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10);
+ spin_lock(&cm->cmm_tgt_guard);
+ list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
+ mc_linkage) {
+ if (mc->mc_num == mdc_num)
+ RETURN(-EEXIST);
+ }
+ spin_unlock(&cm->cmm_tgt_guard);
+#endif
ld = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
-
ld->ld_site = cmm2lu_dev(cm)->ld_site;
rc = ldt->ldt_ops->ldto_device_init(ctx, ld, NULL);
/* pass config to the just created MDC */
rc = ld->ld_ops->ldo_process_config(ctx, ld, cfg);
if (rc == 0) {
- struct mdc_device *mc = lu2mdc_dev(ld);
+ mc = lu2mdc_dev(ld);
+#ifdef CMM_CODE
+ spin_lock(&cm->cmm_tgt_guard);
+#endif
list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
- lu_device_get(cmm2lu_dev(cm));
cm->cmm_tgt_count++;
+#ifdef CMM_CODE
+ spin_unlock(&cm->cmm_tgt_guard);
+#endif
+ lu_device_get(cmm2lu_dev(cm));
}
RETURN(rc);
}
static struct lu_device_operations cmm_lu_ops = {
.ldo_object_alloc = cmm_object_alloc,
-
.ldo_process_config = cmm_process_config
};
/* --- lu_device_type operations --- */
-struct lu_device *cmm_device_alloc(const struct lu_context *ctx,
+static struct lu_device *cmm_device_alloc(const struct lu_context *ctx,
struct lu_device_type *t,
struct lustre_cfg *cfg)
{
return l;
}
-void cmm_device_free(const struct lu_context *ctx, struct lu_device *d)
+static void cmm_device_free(const struct lu_context *ctx, struct lu_device *d)
{
struct cmm_device *m = lu2cmm_dev(d);
OBD_FREE_PTR(m);
}
-int cmm_type_init(struct lu_device_type *t)
+static int cmm_type_init(struct lu_device_type *t)
{
return 0;
}
-void cmm_type_fini(struct lu_device_type *t)
+static void cmm_type_fini(struct lu_device_type *t)
{
return;
}
int err = 0;
ENTRY;
-
+
+#ifdef CMM_CODE
+ spin_lock_init(&m->cmm_tgt_guard);
+#endif
INIT_LIST_HEAD(&m->cmm_targets);
m->cmm_tgt_count = 0;
m->cmm_child = lu2md_dev(next);
#include "cmm_internal.h"
#include "mdc_internal.h"
+#ifdef CMM_CODE
+static int cmm_fld_lookup(const struct lu_fid *fid)
+{
+ int rc;
+ /* temporary hack for proto mkdir */
+ rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+ CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
+ RETURN(rc);
+}
+
+static struct md_object_operations cml_mo_ops;
+static struct md_dir_operations cml_dir_ops;
+static struct lu_object_operations cml_obj_ops;
+
+static struct md_object_operations cmr_mo_ops;
+static struct md_dir_operations cmr_dir_ops;
+static struct lu_object_operations cmr_obj_ops;
+
+struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
+ const struct lu_object_header *loh,
+ struct lu_device *ld)
+{
+ struct lu_object *lo = NULL;
+ const struct lu_fid *fid = loh->loh_fid;
+ int mdsnum, rc;
+ ENTRY;
+
+ /* get object location */
+ mdsnum = cmm_fld_lookup(fid);
+
+ /* select the proper set of operations based on object location */
+ if (mdsnum == lu2cmm_dev(ld)->cmm_local_num) {
+ struct cml_object *clo;
+
+ OBD_ALLOC_PTR(clo);
+ if (clo != NULL) {
+ lo = &clo->cmm_obj.cmo_obj.mo_lu;
+ lu_object_init(lo, NULL, ld);
+ clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
+ clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
+ lo->lo_ops = &cml_obj_ops;
+ }
+ } else {
+ struct cmr_object *cro;
+
+ OBD_ALLOC_PTR(cro);
+ if (cro != NULL) {
+ lo = &cro->cmm_obj.cmo_obj.mo_lu;
+ lu_object_init(lo, NULL, ld);
+ cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
+ cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
+ lo->lo_ops = &cmr_obj_ops;
+ cro->cmo_num = mdsnum;
+ }
+ }
+ RETURN(lo);
+}
+
+/*
+ * CMM has two types of objects - local and remote. They have different set
+ * of operations so we are avoiding multiple checks in code.
+ */
+
+/*
+ * local CMM object operations. cml_...
+ */
+static inline struct cml_object *lu2cml_obj(struct lu_object *o)
+{
+ return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cml_object *md2cml_obj(struct md_object *mo)
+{
+ return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
+}
+static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
+{
+ return container_of0(co, struct cml_object, cmm_obj);
+}
+/* get local child device */
+static struct lu_device *cml_child_dev(struct cmm_device *d)
+{
+ return next = &d->cmm_child->md_lu_dev;
+}
+
+/* lu_object operations */
+static void cml_object_free(const struct lu_context *ctx,
+ struct lu_object *lo)
+{
+ struct cml_object *clo = lu2cml_obj(lo);
+ lu_object_fini(lo);
+ OBD_FREE_PTR(clo);
+}
+
+static int cml_object_init(const struct lu_context *ctx, struct lu_object *lo)
+{
+ struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+ struct lu_device *c_dev;
+ struct lu_object *c_obj;
+ int rc;
+
+ ENTRY;
+
+ c_dev = cml_child_dev(cd);
+ if (c_dev == NULL) {
+ rc = -ENOENT;
+ } else {
+ c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+ lo->lo_header, c_dev);
+ if (c_obj != NULL) {
+ lu_object_add(lo, c_obj);
+ rc = 0;
+ } else {
+ rc = -ENOMEM;
+ }
+ }
+
+ RETURN(rc);
+}
+
+static int cml_object_exists(const struct lu_context *ctx,
+ struct lu_object *lo)
+{
+ return lu_object_exists(ctx, lu_object_next(lo));
+}
+
+static int cml_object_print(const struct lu_context *ctx,
+ struct seq_file *f, const struct lu_object *lo)
+{
+ return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", lo);
+}
+
+static struct lu_object_operations cml_obj_ops = {
+ .loo_object_init = cml_object_init,
+ .loo_object_free = cml_object_free,
+ .loo_object_print = cml_object_print,
+ .loo_object_exists = cml_object_exists
+};
+
+/* CMM local md_object operations */
+static int cml_object_create(const struct lu_context *ctx,
+ struct md_object *mo,
+ struct lu_attr *attr)
+{
+ int rc;
+ ENTRY;
+ rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+ RETURN(rc);
+}
+
+static int cml_attr_get(const struct lu_context *ctx, struct md_object *mo,
+ struct lu_attr *attr)
+{
+ int rc;
+ ENTRY;
+ rc = mo_attr_get(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+ RETURN(rc);
+}
+
+static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
+ struct lu_attr *attr)
+{
+ int rc;
+ ENTRY;
+ rc = mo_attr_set(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+ RETURN(rc);
+}
+
+static int cml_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+ void *buf, int buflen, const char *name)
+{
+ int rc;
+ ENTRY;
+ rc = mo_xattr_get(ctx, cmm2child_obj(md2cmm_obj(mo)),
+ buf, buflen, name);
+ RETURN(rc);
+}
+
+static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+ void *buf, int buflen, const char *name)
+{
+ int rc;
+ ENTRY;
+ rc = mo_xattr_set(ctx, cmm2child_obj(md2cmm_obj(mo)),
+ buf, buflen, name);
+ RETURN(rc);
+}
+
+static int cml_ref_add(const struct lu_context *ctx, struct md_object *mo)
+{
+ int rc;
+ ENTRY;
+ rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ RETURN(rc);
+}
+
+static int cml_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+ int rc;
+ ENTRY;
+ rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ RETURN(rc);
+}
+
+static int cml_open(const struct lu_context *ctx, struct md_object *mo)
+{
+ int rc;
+ ENTRY;
+ rc = mo_open(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ RETURN(rc);
+}
+
+static int cml_close(const struct lu_context *ctx, struct md_object *mo)
+{
+ int rc;
+ ENTRY;
+ rc = mo_close(ctx, cmm2child_obj(md2cmm_obj(mo)));
+ RETURN(rc);
+}
+
+static struct md_object_operations cml_mo_ops = {
+ .moo_attr_get = cml_attr_get,
+ .moo_attr_set = cml_attr_set,
+ .moo_xattr_get = cml_xattr_get,
+ .moo_xattr_set = cml_xattr_set,
+ .moo_object_create = cml_object_create,
+ .moo_ref_add = cml_ref_add,
+ .moo_ref_del = cml_ref_del,
+ .moo_open = cml_open,
+ .moo_close = cml_close
+};
+
+/* md_dir operations */
+static int cml_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+ const char *name, struct lu_fid *lf)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_lookup(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name, lf);
+ RETURN(rc);
+
+}
+
+static int cml_create(const struct lu_context *ctx,
+ struct md_object *mo_p, const char *name,
+ struct md_object *mo_c, struct lu_attr *attr)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_create(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name,
+ cmm2child_obj(md2cmm_obj(mo_c)), attr);
+ RETURN(rc);
+}
+
+static int cml_link(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_s, const char *name)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_link(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ cmm2child_obj(md2cmm_obj(mo_s)), name);
+ RETURN(rc);
+}
+
+static int cml_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_c, const char *name)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_unlink(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ cmm2child_obj(md2cmm_obj(mo_c)), name);
+ RETURN(rc);
+}
+
+static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
+ struct md_object *mo_pn, struct md_object *mo_s,
+ const char *s_name, struct md_object *mo_t,
+ const char *t_name)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+ cmm2child_obj(md2cmm_obj(mo_pn)),
+ cmm2child_obj(md2cmm_obj(mo_s)), s_name,
+ cmm2child_obj(md2cmm_obj(mo_t)), t_name);
+ RETURN(rc);
+}
+
+static int cml_rename_tgt(const struct lu_context *ctx,
+ struct md_object *mo_p,
+ struct md_object *mo_s, struct md_object *mo_t,
+ const char *name)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+ cmm2child_obj(md2cmm_obj(mo_s)),
+ cmm2child_obj(md2cmm_obj(mo_t)), name);
+ RETURN(rc);
+}
+
+static struct md_dir_operations cmm_dir_ops = {
+ .mdo_lookup = cml_lookup,
+ .mdo_create = cml_create,
+ .mdo_link = cml_link,
+ .mdo_unlink = cml_unlink,
+ .mdo_rename = cml_rename,
+ .mdo_rename_tgt = cml_rename_tgt,
+};
+
+/* -------------------------------------------------------------------
+ * remote CMM object operations. cmr_...
+ */
+static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
+{
+ return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
+{
+ return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
+}
+static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
+{
+ return container_of0(co, struct cmr_object, cmm_obj);
+}
+
+/* get local child device */
+static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
+{
+ struct lu_device *next = NULL;
+ struct mdc_device *mdc;
+
+ spin_lock(&d->cmm_tgt_guard);
+ list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
+ if (mdc->mc_num == num) {
+ next = mdc2lu_dev(mdc);
+ break;
+ }
+ }
+ spin_unlock(&d->cmm_tgt_guard);
+ return next;
+}
+
+/* lu_object operations */
+static void cmr_object_free(const struct lu_context *ctx,
+ struct lu_object *lo)
+{
+ struct cmr_object *cro = lu2cmr_obj(lo);
+ lu_object_fini(lo);
+ OBD_FREE_PTR(cro);
+}
+
+static int cmr_object_init(const struct lu_context *ctx, struct lu_object *lo)
+{
+ struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+ struct lu_device *c_dev;
+ struct lu_object *c_obj;
+ const struct lu_fid *fid = lu_object_fid(lo);
+ int rc;
+
+ ENTRY;
+
+ c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
+ if (c_dev == NULL) {
+ rc = -ENOENT;
+ } else {
+ c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+ lo->lo_header, c_dev);
+ if (c_obj != NULL) {
+ lu_object_add(lo, c_obj);
+ rc = 0;
+ } else {
+ rc = -ENOMEM;
+ }
+ }
+
+ RETURN(rc);
+}
+
+
+static int cmr_object_exists(const struct lu_context *ctx,
+ struct lu_object *lo)
+{
+ return lu_object_exists(ctx, lu_object_next(lo));
+}
+
+static int cmr_object_print(const struct lu_context *ctx,
+ struct seq_file *f, const struct lu_object *lo)
+{
+ return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", lo);
+}
+
+static struct lu_object_operations cml_obj_ops = {
+ .loo_object_init = cmr_object_init,
+ .loo_object_free = cmr_object_free,
+ .loo_object_print = cmr_object_print,
+ .loo_object_exists = cmr_object_exists
+};
+
+/* CMM remote md_object operations. All are invalid */
+static int cmr_object_create(const struct lu_context *ctx,
+ struct md_object *mo,
+ struct lu_attr *attr)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_attr_get(const struct lu_context *ctx, struct md_object *mo,
+ struct lu_attr *attr)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_attr_set(const struct lu_context *ctx, struct md_object *mo,
+ struct lu_attr *attr)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+ void *buf, int buflen, const char *name)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+ void *buf, int buflen, const char *name)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_ref_add(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_open(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EFAULT);
+}
+
+static struct md_object_operations cml_mo_ops = {
+ .moo_attr_get = cmr_attr_get,
+ .moo_attr_set = cmr_attr_set,
+ .moo_xattr_get = cmr_xattr_get,
+ .moo_xattr_set = cmr_xattr_set,
+ .moo_object_create = cmr_object_create,
+ .moo_ref_add = cmr_ref_add,
+ .moo_ref_del = cmr_ref_del,
+ .moo_open = cmr_open,
+ .moo_close = cmr_close
+};
+
+/* remote part of md_dir operations */
+static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+ const char *name, struct lu_fid *lf)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_create(const struct lu_context *ctx,
+ struct md_object *mo_p, const char *name,
+ struct md_object *mo_c, struct lu_attr *attr)
+{
+ int rc;
+
+ ENTRY;
+
+ /* remote object creation and local name insert */
+ rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo_c)), attr);
+ if (rc == 0) {
+ rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ name, lu_object_fid(&mo_c->mo_lu));
+ if (rc)
+ mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+ }
+
+ RETURN(rc);
+}
+
+static int cmr_link(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_s, const char *name)
+{
+ int rc;
+ ENTRY;
+
+ rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
+ if (rc == 0) {
+ rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ name, lu_object_fid(&mo_s->mo_lu));
+ if (rc)
+ mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
+ }
+
+ RETURN(rc);
+}
+
+static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_c, const char *name)
+{
+ int rc;
+ ENTRY;
+
+ rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+ if (rc == 0) {
+ rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+ name, lu_object_fid(&mo_c->mo_lu));
+ if (rc)
+ mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+ }
+
+ RETURN(rc);
+}
+
+static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
+ struct md_object *mo_pn, struct md_object *mo_s,
+ const char *s_name, struct md_object *mo_t,
+ const char *t_name)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+ cmm2child_obj(md2cmm_obj(mo_pn)),
+ cmm2child_obj(md2cmm_obj(mo_s)), s_name,
+ cmm2child_obj(md2cmm_obj(mo_t)), t_name);
+
+ RETURN(rc);
+}
+
+static int cmr_rename_tgt(const struct lu_context *ctx,
+ struct md_object *mo_p,
+ struct md_object *mo_s, struct md_object *mo_t,
+ const char *name)
+{
+ int rc;
+ ENTRY;
+ rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+ cmm2child_obj(md2cmm_obj(mo_s)),
+ cmm2child_obj(md2cmm_obj(mo_t)), name);
+ RETURN(rc);
+}
+
+static struct md_dir_operations cmm_dir_ops = {
+ .mdo_lookup = cmr_lookup,
+ .mdo_create = cmr_create,
+ .mdo_link = cmr_link,
+ .mdo_unlink = cmr_unlink,
+ .mdo_rename = cmr_rename,
+ .mdo_rename_tgt = cmr_rename_tgt,
+};
+
+#else /* CMM_CODE */
static struct md_object_operations cmm_mo_ops;
static struct md_dir_operations cmm_dir_ops;
static struct lu_object_operations cmm_obj_ops;
/* remote object creation and local name insert */
rc = mo_object_create(ctx, ch_c, attr);
if (rc == 0) {
- rc = mdo_name_insert(ctx, ch_p, name, lf, attr);
+ rc = mdo_name_insert(ctx, ch_p, name, lf);
}
}
/* remote object creation and local name insert */
rc = mo_object_create(ctx, ch_c, attr);
if (rc == 0) {
- rc = mdo_name_insert(ctx, ch_p, name, lf, attr);
+ rc = mdo_name_insert(ctx, ch_p, name, lf);
}
}
.mdo_mkdir = cmm_mkdir,
.mdo_create = cmm_create
};
-
+#endif /* CMM_CODE */