#include "filter_internal.h"
+/* Group 0 is no longer a legal group, to catch uninitialized IDs */
+#define FILTER_MIN_GROUPS 3
static struct lvfs_callback_ops filter_lvfs_ops;
kmem_cache_t *ll_fmd_cachep;
* need to be set up like real exports as filter_connect() does.
*/
exp = class_new_export(obd, (struct obd_uuid *)fcd->fcd_uuid);
+
CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
- " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
- last_rcvd, le64_to_cpu(fsd->lsd_last_transno));
+ " srv lr: "LPU64" fcd_group %d\n", fcd->fcd_uuid, cl_idx,
+ last_rcvd, le64_to_cpu(fsd->lsd_last_transno),
+ le32_to_cpu(fcd->fcd_group));
if (IS_ERR(exp)) {
if (PTR_ERR(exp) == -EALREADY) {
/* export already exists, zero out this one */
} else {
fed = &exp->exp_filter_data;
fed->fed_fcd = fcd;
+ fed->fed_group = le32_to_cpu(fcd->fcd_group);
rc = filter_client_add(obd, filter, fed, cl_idx);
LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
struct filter_obd *filter = &obd->u.filter;
struct file *filp;
struct dentry *dentry;
- int i;
+ int i, j;
ENTRY;
if (filter->fo_dentry_O_groups != NULL) {
- for (i = 0; i < FILTER_GROUPS; i++) {
+ for (i = 0; i < filter->fo_group_count; i++) {
dentry = filter->fo_dentry_O_groups[i];
if (dentry != NULL)
f_dput(dentry);
}
OBD_FREE(filter->fo_dentry_O_groups,
- FILTER_GROUPS * sizeof(*filter->fo_dentry_O_groups));
+ filter->fo_group_count *
+ sizeof(*filter->fo_dentry_O_groups));
filter->fo_dentry_O_groups = NULL;
}
if (filter->fo_last_objid_files != NULL) {
- for (i = 0; i < FILTER_GROUPS; i++) {
+ for (i = 0; i < filter->fo_group_count; i++) {
filp = filter->fo_last_objid_files[i];
if (filp != NULL)
filp_close(filp, 0);
}
OBD_FREE(filter->fo_last_objid_files,
- FILTER_GROUPS * sizeof(*filter->fo_last_objid_files));
+ filter->fo_group_count *
+ sizeof(*filter->fo_last_objid_files));
filter->fo_last_objid_files = NULL;
}
if (filter->fo_dentry_O_sub != NULL) {
- for (i = 0; i < filter->fo_subdir_count; i++) {
- dentry = filter->fo_dentry_O_sub[i];
- if (dentry != NULL)
- f_dput(dentry);
+ for (i = 0; i < filter->fo_group_count; i++) {
+ for (j = 0; j < filter->fo_subdir_count; j++) {
+ dentry = filter->fo_dentry_O_sub[i].dentry[j];
+ if (dentry != NULL)
+ f_dput(dentry);
+ }
}
OBD_FREE(filter->fo_dentry_O_sub,
filter->fo_subdir_count *
}
if (filter->fo_last_objids != NULL) {
OBD_FREE(filter->fo_last_objids,
- FILTER_GROUPS * sizeof(*filter->fo_last_objids));
+ filter->fo_group_count *
+ sizeof(*filter->fo_last_objids));
filter->fo_last_objids = NULL;
}
if (filter->fo_dentry_O != NULL) {
RETURN(0);
}
+static int filter_update_last_group(struct obd_device *obd, int group)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ struct file *filp = NULL;
+ int last_group = 0, rc;
+ loff_t off = 0;
+ ENTRY;
+
+ if (group <= filter->fo_committed_group)
+ RETURN(0);
+
+ filp = filp_open("LAST_GROUP", O_RDWR, 0700);
+ if (IS_ERR(filp)) {
+ rc = PTR_ERR(filp);
+ filp = NULL;
+ CERROR("cannot open LAST_GROUP: rc = %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+
+ rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+ if (rc) {
+ CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
+ GOTO(cleanup, rc);
+ }
+ LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS);
+ CDEBUG(D_INODE, "%s: previous %d, new %d\n",
+ obd->obd_name, last_group, group);
+
+ off = 0;
+ last_group = group;
+ /* must be sync: bXXXX */
+ rc = fsfilt_write_record(obd, filp, &last_group, sizeof(__u32), &off, 1);
+ if (rc) {
+ CDEBUG(D_INODE, "error updating LAST_GROUP: rc %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+
+ filter->fo_committed_group = group;
+cleanup:
+ if (filp)
+ filp_close(filp, 0);
+ RETURN(rc);
+}
+
+static int filter_read_group_internal(struct obd_device *obd, int group,
+ int create)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ __u64 *new_objids = NULL;
+ struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs = NULL;
+ struct dentry **new_groups = NULL;
+ struct file **new_files = NULL;
+ struct dentry *dentry;
+ struct file *filp;
+ int old_count = filter->fo_group_count, rc, stage = 0, i;
+ char name[25];
+ __u64 last_objid;
+ loff_t off = 0;
+
+ snprintf(name, 24, "%d", group);
+ name[24] = '\0';
+
+ if (!create) {
+ dentry = ll_lookup_one_len(name, filter->fo_dentry_O,
+ strlen(name));
+ if (IS_ERR(dentry)) {
+ CERROR("Cannot lookup expected object group %d: %ld\n",
+ group, PTR_ERR(dentry));
+ RETURN(PTR_ERR(dentry));
+ }
+ } else {
+ dentry = simple_mkdir(filter->fo_dentry_O, name, 0700, 1);
+ if (IS_ERR(dentry)) {
+ CERROR("cannot lookup/create O/%s: rc = %ld\n", name,
+ PTR_ERR(dentry));
+ RETURN(PTR_ERR(dentry));
+ }
+ }
+ stage = 1;
+
+ snprintf(name, 24, "O/%d/LAST_ID", group);
+ name[24] = '\0';
+ filp = filp_open(name, O_CREAT | O_RDWR, 0700);
+ if (IS_ERR(filp)) {
+ CERROR("cannot create %s: rc = %ld\n", name, PTR_ERR(filp));
+ GOTO(cleanup, rc = PTR_ERR(filp));
+ }
+ stage = 2;
+
+ rc = fsfilt_read_record(obd, filp, &last_objid, sizeof(__u64), &off);
+ if (rc) {
+ CDEBUG(D_INODE, "error reading %s: rc %d\n", name, rc);
+ GOTO(cleanup, rc);
+ }
+
+ if (filter->fo_subdir_count) {
+ OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs));
+ if (tmp_subdirs == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ stage = 3;
+
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ char dir[20];
+ snprintf(dir, sizeof(dir), "d%u", i);
+
+ tmp_subdirs->dentry[i] = simple_mkdir(dentry, dir, 0700, 1);
+ if (IS_ERR(tmp_subdirs->dentry[i])) {
+ rc = PTR_ERR(tmp_subdirs->dentry[i]);
+ CERROR("can't lookup/create O/%d/%s: rc = %d\n",
+ group, dir, rc);
+ GOTO(cleanup, rc);
+ }
+
+ CDEBUG(D_INODE, "got/created O/%d/%s: %p\n", group, dir,
+ tmp_subdirs->dentry[i]);
+ }
+ }
+
+ /* 'group' is an index; we need an array of length 'group + 1' */
+ if (group + 1 > old_count) {
+ int len = group + 1;
+ OBD_ALLOC(new_objids, len * sizeof(*new_objids));
+ OBD_ALLOC(new_subdirs, len * sizeof(*new_subdirs));
+ OBD_ALLOC(new_groups, len * sizeof(*new_groups));
+ OBD_ALLOC(new_files, len * sizeof(*new_files));
+ stage = 4;
+ if (new_objids == NULL || new_subdirs == NULL ||
+ new_groups == NULL || new_files == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ memcpy(new_objids, filter->fo_last_objids,
+ old_count * sizeof(*new_objids));
+ memcpy(new_subdirs, filter->fo_dentry_O_sub,
+ old_count * sizeof(*new_subdirs));
+ memcpy(new_groups, filter->fo_dentry_O_groups,
+ old_count * sizeof(*new_groups));
+ memcpy(new_files, filter->fo_last_objid_files,
+ old_count * sizeof(*new_files));
+
+ if (old_count) {
+ OBD_FREE(filter->fo_last_objids,
+ old_count * sizeof(*new_objids));
+ OBD_FREE(filter->fo_dentry_O_sub,
+ old_count * sizeof(*new_subdirs));
+ OBD_FREE(filter->fo_dentry_O_groups,
+ old_count * sizeof(*new_groups));
+ OBD_FREE(filter->fo_last_objid_files,
+ old_count * sizeof(*new_files));
+ }
+ filter->fo_last_objids = new_objids;
+ filter->fo_dentry_O_sub = new_subdirs;
+ filter->fo_dentry_O_groups = new_groups;
+ filter->fo_last_objid_files = new_files;
+ filter->fo_group_count = len;
+ }
+
+ filter->fo_dentry_O_groups[group] = dentry;
+ filter->fo_last_objid_files[group] = filp;
+ if (filter->fo_subdir_count) {
+ filter->fo_dentry_O_sub[group] = *tmp_subdirs;
+ OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
+ }
+
+ filter_update_last_group(obd, group);
+
+ if (filp->f_dentry->d_inode->i_size == 0) {
+ filter->fo_last_objids[group] = FILTER_INIT_OBJID;
+ rc = filter_update_last_objid(obd, group, 1);
+ RETURN(rc);
+ }
+
+ filter->fo_last_objids[group] = le64_to_cpu(last_objid);
+ CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
+ obd->obd_name, group, last_objid);
+ RETURN(0);
+ cleanup:
+ switch (stage) {
+ case 4:
+ if (new_objids != NULL)
+ OBD_FREE(new_objids, group * sizeof(*new_objids));
+ if (new_subdirs != NULL)
+ OBD_FREE(new_subdirs, group * sizeof(*new_subdirs));
+ if (new_groups != NULL)
+ OBD_FREE(new_groups, group * sizeof(*new_groups));
+ if (new_files != NULL)
+ OBD_FREE(new_files, group * sizeof(*new_files));
+ case 3:
+ if (filter->fo_subdir_count) {
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ if (tmp_subdirs->dentry[i] != NULL)
+ dput(tmp_subdirs->dentry[i]);
+ }
+ OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
+ }
+ case 2:
+ filp_close(filp, 0);
+ case 1:
+ dput(dentry);
+ }
+ RETURN(rc);
+}
+
+static int filter_read_groups(struct obd_device *obd, int last_group,
+ int create)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ int old_count, group, rc = 0;
+
+ down(&filter->fo_init_lock);
+ old_count = filter->fo_group_count;
+ for (group = old_count; group <= last_group; group++) {
+ if (group == 0)
+ continue; /* no group zero */
+
+ rc = filter_read_group_internal(obd, group, create);
+ if (rc != 0)
+ break;
+ }
+ up(&filter->fo_init_lock);
+ return rc;
+}
+
/* FIXME: object groups */
static int filter_prep_groups(struct obd_device *obd)
{
struct filter_obd *filter = &obd->u.filter;
struct dentry *dentry, *O_dentry;
struct file *filp;
- int i, rc = 0, cleanup_phase = 0;
+ int last_group, rc = 0, cleanup_phase = 0;
+ loff_t off = 0;
ENTRY;
O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
f_dput(dentry);
}
+ cleanup_phase = 2; /* groups */
+
+ /* we have to initialize all groups before first connections from
+ * clients because they may send create/destroy for any group -bzzz */
+ filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700);
+ if (IS_ERR(filp)) {
+ CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp));
+ GOTO(cleanup, rc = PTR_ERR(filp));
+ }
+ cleanup_phase = 3; /* filp */
+
+ rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+ if (rc) {
+ CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+ if (off == 0) {
+ last_group = FILTER_MIN_GROUPS;
+ } else {
+ LASSERT(last_group >= FILTER_MIN_GROUPS);
+ }
+
+ CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
+ FILTER_MIN_GROUPS, last_group);
+ filter->fo_committed_group = last_group;
+ rc = filter_read_groups(obd, last_group, 1);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ filp_close(filp, 0);
+ RETURN(0);
+
+ cleanup:
+ switch (cleanup_phase) {
+ case 3:
+ filp_close(filp, 0);
+ case 2:
+ filter_cleanup_groups(obd);
+ case 1:
+ f_dput(filter->fo_dentry_O);
+ filter->fo_dentry_O = NULL;
+ default:
+ break;
+ }
+ return rc;
+
+#if 0
OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
if (filter->fo_last_objids == NULL)
GOTO(cleanup, rc = -ENOMEM);
cleanup:
filter_cleanup_groups(obd);
return rc;
+#endif
}
/* setup the object store with correct subdirectories */
if (rc)
CERROR("error writing server data: rc = %d\n", rc);
- for (i = 0; i < FILTER_GROUPS; i++) {
- rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1));
+ for (i = 0; i < filter->fo_group_count; i++) {
+ rc = filter_update_last_objid(obd, i,
+ (i == filter->fo_group_count - 1));
if (rc)
CERROR("error writing group %d lastobjid: rc = %d\n",
i, rc);
obd_id id, obd_gr group)
{
LASSERT(filter->fo_fsd != NULL);
- LASSERT(group <= FILTER_GROUPS);
+ LASSERT(group <= filter->fo_group_count);
spin_lock(&filter->fo_objidlock);
filter->fo_last_objids[group] = id;
{
obd_id id;
LASSERT(filter->fo_fsd != NULL);
- LASSERT(group <= FILTER_GROUPS);
+ LASSERT(group <= filter->fo_group_count);
/* FIXME: object groups */
spin_lock(&filter->fo_objidlock);
struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
{
struct filter_obd *filter = &obd->u.filter;
- LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */
+ struct filter_subdirs *subdirs;
+ LASSERT(group < filter->fo_group_count); /* FIXME: object groups */
if (group > 0 || filter->fo_subdir_count == 0)
return filter->fo_dentry_O_groups[group];
- return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
+ subdirs = &filter->fo_dentry_O_sub[group];
+ return subdirs->dentry[objid & (filter->fo_subdir_count - 1)];
}
/* We never dget the object parent, so DON'T dput it either */
RETURN(dchild);
}
-static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
+static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
+ obd_id group)
{
struct lustre_handle lockh;
int flags = LDLM_AST_DISCARD_DATA, rc;
- struct ldlm_res_id res_id = { .name = { objid } };
+ struct ldlm_res_id res_id = { .name = { objid, 0, group, 0} };
ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
ENTRY;
obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
obd->obd_lvfs_ctxt.fs = get_ds();
obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
-
+
+ sema_init(&filter->fo_init_lock, 1);
+ filter->fo_committed_group = 0;
+
rc = filter_prep(obd);
if (rc)
GOTO(err_ops, rc);
struct obd_uuid *cluuid,
struct obd_connect_data *data)
{
+ struct lvfs_run_ctxt saved;
struct obd_export *exp;
struct filter_export_data *fed;
struct filter_client_data *fcd = NULL;
struct filter_obd *filter = &obd->u.filter;
+ __u32 group;
int rc;
ENTRY;
if (rc)
GOTO(cleanup, rc);
- if (!obd->obd_replayable)
- GOTO(cleanup, rc = 0);
+ group = data->ocd_group;
+ if (obd->obd_replayable) {
+ OBD_ALLOC(fcd, sizeof(*fcd));
+ if (!fcd) {
+ CERROR("filter: out of memory for client data\n");
+ GOTO(cleanup, rc = -ENOMEM);
+ }
- OBD_ALLOC(fcd, sizeof(*fcd));
- if (!fcd) {
- CERROR("filter: out of memory for client data\n");
- GOTO(cleanup, rc = -ENOMEM);
+ memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
+ fed->fed_fcd = fcd;
+ fed->fed_fcd->fcd_group = group;
+ rc = filter_client_add(obd, filter, fed, -1);
+ if (rc)
+ GOTO(cleanup, rc);
+ }
+ CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
+ obd->obd_name, exp->exp_handle.h_cookie, group);
+ if (group == 0)
+ GOTO(cleanup, rc);
+
+ if (fed->fed_group != 0 && fed->fed_group != group) {
+ CERROR("!!! This export (nid %s) used object group %d "
+ "earlier; now it's trying to use group %d! This could "
+ "be a bug in the MDS. Tell CFS.\n",
+ obd_export_nid2str(exp), fed->fed_group, group);
+ GOTO(cleanup, rc = -EPROTO);
}
+ fed->fed_group = group;
- memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
- fed->fed_fcd = fcd;
-
- rc = filter_client_add(obd, filter, fed, -1);
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ rc = filter_read_groups(obd, group, 1);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ if (rc != 0) {
+ CERROR("can't read group %u\n", group);
+ GOTO(cleanup, rc);
+ }
GOTO(cleanup, rc);
const char *what, int quiet)
{
struct dentry *dchild = NULL;
+ obd_gr group = 0;
- if (!(oa->o_valid & OBD_MD_FLGROUP))
- oa->o_gr = 0;
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ group = oa->o_gr;
- dchild = filter_fid2dentry(obd, NULL, oa->o_gr, oa->o_id);
+ dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
if (IS_ERR(dchild)) {
CERROR("%s error looking up object: "LPU64"\n",
if (!(oa->o_valid & OBD_MD_FLGROUP))
oa->o_gr = 0;
-
/* packing fid and converting it to LE for storing into EA.
* Here ->o_stripe_idx should be filled by LOV and rest of
* fields - by client. */
int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
struct obd_trans_info *oti)
{
- struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id } };
+ struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id, 0,
+ oinfo->oi_oa->o_gr, 0 } };
struct ldlm_valblock_ops *ns_lvbo;
struct filter_mod_data *fmd;
struct lvfs_run_ctxt saved;
LASSERT(down_trylock(&filter->fo_create_lock) != 0);
memset(&doa, 0, sizeof(doa));
- if (oa->o_valid & OBD_MD_FLGROUP) {
- doa.o_valid |= OBD_MD_FLGROUP;
- doa.o_gr = oa->o_gr;
- } else {
- doa.o_gr = 0;
- }
+
+ LASSERT(oa->o_gr != 0);
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+ doa.o_valid |= OBD_MD_FLGROUP;
+ doa.o_gr = oa->o_gr;
doa.o_mode = S_IFREG;
if (!filter->fo_destroy_in_progress) {
static int filter_create(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
+ struct filter_export_data *fed;
struct obd_device *obd = NULL;
+ struct filter_obd *filter;
struct lvfs_run_ctxt saved;
struct lov_stripe_md *lsm = NULL;
- int rc = 0, diff;
+ int rc = 0, diff, group = oa->o_gr;
ENTRY;
+ if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
+ CERROR("!!! nid %s sent invalid object group %d\n",
+ obd_export_nid2str(exp), group);
+ RETURN(-EINVAL);
+ }
+
+ obd = exp->exp_obd;
+ fed = &exp->exp_filter_data;
+ filter = &obd->u.filter;
+
+ if (fed->fed_group != group) {
+ CERROR("!!! this export (nid %s) used object group %d "
+ "earlier; now it's trying to use group %d! This could "
+ "be a bug in the MDS. Tell CFS.\n",
+ obd_export_nid2str(exp), fed->fed_group, group);
+ RETURN(-ENOTUNIQ);
+ }
+
+#if 0
if (!(oa->o_valid & OBD_MD_FLGROUP))
oa->o_gr = 0;
-
+#endif
CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
oa->o_gr, oa->o_id);
if (ea != NULL) {
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
(oa->o_flags & OBD_FL_RECREATE_OBJS)) {
- if (oa->o_id > filter_last_id(&obd->u.filter, oa->o_gr)) {
+ if (oa->o_id > filter_last_id(filter, oa->o_gr)) {
CERROR("recreate objid "LPU64" > last id "LPU64"\n",
- oa->o_id, filter_last_id(&obd->u.filter,
+ oa->o_id, filter_last_id(filter,
oa->o_gr));
rc = -EINVAL;
} else {
- struct filter_obd *filter = &obd->u.filter;
-
diff = 1;
down(&filter->fo_create_lock);
rc = filter_precreate(obd, oa, oa->o_gr, &diff);
struct iattr iattr;
ENTRY;
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+#if 0
if (!(oa->o_valid & OBD_MD_FLGROUP))
oa->o_gr = 0;
-
+#endif
obd = exp->exp_obd;
filter = &obd->u.filter;
GOTO(cleanup, rc = -ENOENT);
}
- filter_prepare_destroy(obd, oa->o_id);
+ filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
/* Our MDC connection is established by the MDS to us */
if (oa->o_valid & OBD_MD_FLCOOKIE) {