obd_transno_commit_cb(obd, transno, error);
}
-
/* Assumes caller has already pushed us into the kernel context. */
int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
int rc)
fcd->fcd_last_xid = 0;
off = fed->fed_lr_off;
- fsfilt_add_journal_cb(exp->exp_obd, filter->fo_sb, last_rcvd,
- oti->oti_handle, filter_commit_cb, NULL);
+ fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
+ filter_commit_cb, NULL);
err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
sizeof(*fcd), &off, 0);
if (err) {
fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
if (new_client) {
- struct lvfs_run_ctxt saved;
+ struct obd_run_ctxt saved;
loff_t off = fed->fed_lr_off;
int err;
void *handle;
CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
/* Transaction needed to fix bug 1403 */
handle = fsfilt_start(obd,
filter->fo_rcvd_filp->f_dentry->d_inode,
filter->fo_rcvd_filp->f_dentry->d_inode,
handle, 1);
}
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
if (err) {
CERROR("error writing %s client idx %u: rc %d\n",
struct filter_obd *filter = &exp->exp_obd->u.filter;
struct obd_device *obd = exp->exp_obd;
struct filter_client_data zero_fcd;
- struct lvfs_run_ctxt saved;
+ struct obd_run_ctxt saved;
int rc;
loff_t off;
ENTRY;
}
memset(&zero_fcd, 0, sizeof zero_fcd);
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
sizeof(zero_fcd), &off, 1);
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
CDEBUG(rc == 0 ? D_INFO : D_ERROR,
"zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
int rc;
ENTRY;
- CDEBUG(D_INODE, "%s: server last_objid for group "LPU64": "LPU64"\n",
- obd->obd_name, group, filter->fo_last_objids[group]);
+ if (filter->fo_last_objid_files[group] == NULL) {
+ CERROR("Object group "LPU64" not fully setup; not updating "
+ "last_objid\n", group);
+ RETURN(0);
+ }
+
+ CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n",
+ group, filter->fo_last_objids[group]);
tmp = cpu_to_le64(filter->fo_last_objids[group]);
rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
- last_rcvd_size <= le32_to_cpu(fsd->fsd_client_start) ? 0 :
- (last_rcvd_size - le32_to_cpu(fsd->fsd_client_start)) /
- le16_to_cpu(fsd->fsd_client_size));
+ last_rcvd_size <= FILTER_LR_CLIENT_START ? 0 :
+ (last_rcvd_size-FILTER_LR_CLIENT_START) /FILTER_LR_CLIENT_SIZE);
if (!obd->obd_replayable) {
CWARN("%s: recovery support OFF\n", obd->obd_name);
if (obd->obd_recoverable_clients) {
CWARN("RECOVERY: %d recoverable clients, last_rcvd "
- LPU64"\n", obd->obd_recoverable_clients,
- le64_to_cpu(fsd->fsd_last_transno));
+ LPU64"\n", obd->obd_recoverable_clients,
+ le64_to_cpu(fsd->fsd_last_transno));
obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
obd->obd_recovering = 1;
}
static int filter_cleanup_groups(struct obd_device *obd)
{
struct filter_obd *filter = &obd->u.filter;
- struct file *filp;
struct dentry *dentry;
- int i;
+ int i, k;
ENTRY;
- if (filter->fo_dentry_O_groups != NULL) {
- for (i = 0; i < FILTER_GROUPS; i++) {
- dentry = filter->fo_dentry_O_groups[i];
- if (dentry != NULL)
+ for (i = 0; i < filter->fo_group_count; i++) {
+ if (filter->fo_subdirs != NULL) {
+ for (k = 0; k < filter->fo_subdir_count; k++) {
+ dentry = filter->fo_subdirs[i].dentry[k];
+ if (dentry == NULL)
+ continue;
f_dput(dentry);
+ filter->fo_subdirs[i].dentry[k] = NULL;
+ }
}
- OBD_FREE(filter->fo_dentry_O_groups,
- FILTER_GROUPS * sizeof(*filter->fo_dentry_O_groups));
- filter->fo_dentry_O_groups = NULL;
- }
- if (filter->fo_last_objid_files != NULL) {
- for (i = 0; i < FILTER_GROUPS; i++) {
- filp = filter->fo_last_objid_files[i];
- if (filp != NULL)
- filp_close(filp, 0);
+ if (filter->fo_last_objid_files[i] != NULL) {
+ filp_close(filter->fo_last_objid_files[i], 0);
+ filter->fo_last_objid_files[i] = NULL;
}
+ if (filter->fo_groups[i] != NULL) {
+ dput(filter->fo_groups[i]);
+ filter->fo_groups[i] = NULL;
+ }
+ }
+ if (filter->fo_subdirs != NULL)
+ OBD_FREE(filter->fo_subdirs,
+ filter->fo_group_count * sizeof(*filter->fo_subdirs));
+ if (filter->fo_groups != NULL)
+ OBD_FREE(filter->fo_groups,
+ filter->fo_group_count * sizeof(*filter->fo_groups));
+ if (filter->fo_last_objids != NULL)
+ OBD_FREE(filter->fo_last_objids,
+ filter->fo_group_count * sizeof(__u64));
+ if (filter->fo_last_objid_files != NULL)
OBD_FREE(filter->fo_last_objid_files,
- FILTER_GROUPS * sizeof(*filter->fo_last_objid_files));
- filter->fo_last_objid_files = NULL;
+ filter->fo_group_count * sizeof(struct file *));
+ RETURN(0);
+}
+
+static int filter_read_group_internal(struct obd_device *obd, int group,
+ int create)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ __u64 *new_objids = NULL;
+ struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs;
+ struct dentry **new_groups = NULL;
+ struct file **new_files = NULL;
+ struct dentry *dentry;
+ struct file *filp;
+ int old_count = filter->fo_group_count, rc, stage = 0, i;
+ char name[25];
+ __u64 last_objid;
+ loff_t off = 0;
+
+ snprintf(name, 24, "%d", group);
+ name[24] = '\0';
+
+ if (!create) {
+ dentry = ll_lookup_one_len(name, filter->fo_dentry_O,
+ strlen(name));
+ if (IS_ERR(dentry)) {
+ CERROR("Cannot lookup expected object group %d: %ld\n",
+ group, PTR_ERR(dentry));
+ RETURN(PTR_ERR(dentry));
+ }
+ } else {
+ dentry = simple_mkdir(filter->fo_dentry_O, name, 0700, 1);
+ if (IS_ERR(dentry)) {
+ CERROR("cannot lookup/create O/%s: rc = %ld\n", name,
+ PTR_ERR(dentry));
+ RETURN(PTR_ERR(dentry));
+ }
}
- if (filter->fo_dentry_O_sub != NULL) {
+ stage = 1;
+
+ snprintf(name, 24, "O/%d/LAST_ID", group);
+ name[24] = '\0';
+ filp = filp_open(name, O_CREAT | O_RDWR, 0700);
+ if (IS_ERR(filp)) {
+ CERROR("cannot create %s: rc = %ld\n", name, PTR_ERR(filp));
+ GOTO(cleanup, rc = PTR_ERR(filp));
+ }
+ stage = 2;
+
+ rc = fsfilt_read_record(obd, filp, &last_objid, sizeof(__u64), &off);
+ if (rc) {
+ CDEBUG(D_INODE, "error reading %s: rc %d\n", name, rc);
+ GOTO(cleanup, rc);
+ }
+
+ if (filter->fo_subdir_count) {
+ OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs));
+ if (tmp_subdirs == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ stage = 3;
+
for (i = 0; i < filter->fo_subdir_count; i++) {
- dentry = filter->fo_dentry_O_sub[i];
- if (dentry != NULL)
- f_dput(dentry);
+ char dir[20];
+ snprintf(dir, sizeof(dir), "d%u", i);
+
+ tmp_subdirs->dentry[i] =
+ simple_mkdir(dentry, dir, 0700, 1);
+ if (IS_ERR(tmp_subdirs->dentry[i])) {
+ rc = PTR_ERR(tmp_subdirs->dentry[i]);
+ CERROR("can't lookup/create O/%d/%s: rc = %d\n",
+ group, dir, rc);
+ GOTO(cleanup, rc);
+ }
+ CDEBUG(D_INODE, "got/created O/%d/%s: %p\n", group, dir,
+ tmp_subdirs->dentry[i]);
}
- OBD_FREE(filter->fo_dentry_O_sub,
- filter->fo_subdir_count *
- sizeof(*filter->fo_dentry_O_sub));
- filter->fo_dentry_O_sub = NULL;
}
- if (filter->fo_last_objids != NULL) {
- OBD_FREE(filter->fo_last_objids,
- FILTER_GROUPS * sizeof(*filter->fo_last_objids));
- filter->fo_last_objids = NULL;
+
+ /* 'group' is an index; we need an array of length 'group + 1' */
+ if (group + 1 > old_count) {
+ int len = group + 1;
+ OBD_ALLOC(new_objids, len * sizeof(*new_objids));
+ OBD_ALLOC(new_subdirs, len * sizeof(*new_subdirs));
+ OBD_ALLOC(new_groups, len * sizeof(*new_groups));
+ OBD_ALLOC(new_files, len * sizeof(*new_files));
+ stage = 4;
+ if (new_objids == NULL || new_subdirs == NULL ||
+ new_groups == NULL || new_files == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ memcpy(new_objids, filter->fo_last_objids,
+ old_count * sizeof(*new_objids));
+ memcpy(new_subdirs, filter->fo_subdirs,
+ old_count * sizeof(*new_subdirs));
+ memcpy(new_groups, filter->fo_groups,
+ old_count * sizeof(*new_groups));
+ memcpy(new_files, filter->fo_last_objid_files,
+ old_count * sizeof(*new_files));
+
+ if (old_count) {
+ OBD_FREE(filter->fo_last_objids,
+ old_count * sizeof(*new_objids));
+ OBD_FREE(filter->fo_subdirs,
+ old_count * sizeof(*new_subdirs));
+ OBD_FREE(filter->fo_groups,
+ old_count * sizeof(*new_groups));
+ OBD_FREE(filter->fo_last_objid_files,
+ old_count * sizeof(*new_files));
+ }
+ filter->fo_last_objids = new_objids;
+ filter->fo_subdirs = new_subdirs;
+ filter->fo_groups = new_groups;
+ filter->fo_last_objid_files = new_files;
+ filter->fo_group_count = len;
}
- if (filter->fo_dentry_O != NULL) {
- f_dput(filter->fo_dentry_O);
- filter->fo_dentry_O = NULL;
+
+ filter->fo_groups[group] = dentry;
+ filter->fo_last_objid_files[group] = filp;
+ if (filter->fo_subdir_count) {
+ filter->fo_subdirs[group] = *tmp_subdirs;
+ OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
+ }
+
+ if (filp->f_dentry->d_inode->i_size == 0) {
+ filter->fo_last_objids[group] = FILTER_INIT_OBJID;
+ RETURN(0);
}
+
+ filter->fo_last_objids[group] = le64_to_cpu(last_objid);
+ CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
+ obd->obd_name, group, last_objid);
RETURN(0);
+ cleanup:
+ switch (stage) {
+ case 4:
+ if (new_objids != NULL)
+ OBD_FREE(new_objids, group * sizeof(*new_objids));
+ if (new_subdirs != NULL)
+ OBD_FREE(new_subdirs, group * sizeof(*new_subdirs));
+ if (new_groups != NULL)
+ OBD_FREE(new_groups, group * sizeof(*new_groups));
+ if (new_files != NULL)
+ OBD_FREE(new_files, group * sizeof(*new_files));
+ case 3:
+ if (filter->fo_subdir_count) {
+ for (i = 0; i < filter->fo_subdir_count; i++) {
+ if (tmp_subdirs->dentry[i] != NULL)
+ dput(tmp_subdirs->dentry[i]);
+ }
+ OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
+ }
+ case 2:
+ filp_close(filp, 0);
+ case 1:
+ dput(dentry);
+ }
+ RETURN(rc);
+}
+
+static int filter_read_groups(struct obd_device *obd, int last_group,
+ int create)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ int old_count = filter->fo_group_count, group = old_count, rc = 0;
+
+ for (group = old_count; group <= last_group; group++) {
+ if (group == 0)
+ continue; /* no group zero */
+
+ rc = filter_read_group_internal(obd, group, create);
+ if (rc != 0)
+ break;
+ }
+ return rc;
}
-/* FIXME: object groups */
static int filter_prep_groups(struct obd_device *obd)
{
struct filter_obd *filter = &obd->u.filter;
struct dentry *dentry, *O_dentry;
- struct file *filp;
- int i, rc = 0, cleanup_phase = 0;
+ int rc = 0, cleanup_phase = 0;
ENTRY;
O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
GOTO(cleanup_O0, rc);
cleanup_O0:
- f_dput(O0_dentry);
+ dput(O0_dentry);
cleanup_R:
- f_dput(dentry);
+ dput(dentry);
if (rc)
GOTO(cleanup, rc);
} else {
- f_dput(dentry);
+ dput(dentry);
}
- OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
- if (filter->fo_last_objids == NULL)
- GOTO(cleanup, rc = -ENOMEM);
cleanup_phase = 2; /* groups */
- OBD_ALLOC(filter->fo_dentry_O_groups, FILTER_GROUPS * sizeof(dentry));
- if (filter->fo_dentry_O_groups == NULL)
- GOTO(cleanup, rc = -ENOMEM);
- OBD_ALLOC(filter->fo_last_objid_files, FILTER_GROUPS * sizeof(filp));
- if (filter->fo_last_objid_files == NULL)
- GOTO(cleanup, rc = -ENOMEM);
-
- for (i = 0; i < FILTER_GROUPS; i++) {
- char name[25];
- loff_t off = 0;
-
- sprintf(name, "%d", i);
- dentry = simple_mkdir(O_dentry, name, 0700, 1);
- CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("cannot lookup/create O/%s: rc = %d\n",
- name, rc);
- GOTO(cleanup, rc);
- }
- filter->fo_dentry_O_groups[i] = dentry;
-
- sprintf(name, "O/%d/LAST_ID", i);
- filp = filp_open(name, O_CREAT | O_RDWR, 0700);
- if (IS_ERR(filp)) {
- rc = PTR_ERR(filp);
- CERROR("cannot create %s: rc = %d\n", name, rc);
- GOTO(cleanup, rc);
- }
- filter->fo_last_objid_files[i] = filp;
-
- if (filp->f_dentry->d_inode->i_size == 0) {
- if (i == 0 && filter->fo_fsd->fsd_unused != 0) {
- /* OST conversion, remove sometime post 1.0 */
- filter->fo_last_objids[i] =
- le64_to_cpu(filter->fo_fsd->fsd_unused);
- CWARN("saving old objid "LPU64" to LAST_ID\n",
- filter->fo_last_objids[i]);
- rc = filter_update_last_objid(obd, 0, 1);
- if (rc)
- GOTO(cleanup, rc);
- } else {
- filter->fo_last_objids[i] = FILTER_INIT_OBJID;
- }
- continue;
- }
-
- rc = fsfilt_read_record(obd, filp, &filter->fo_last_objids[i],
- sizeof(__u64), &off);
- if (rc) {
- CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n",
- name, rc);
- GOTO(cleanup, rc);
- }
- filter->fo_last_objids[i] =
- le64_to_cpu(filter->fo_last_objids[i]);
- CDEBUG(D_HA, "%s: server last_objid group %d: "LPU64"\n",
- obd->obd_name, i, filter->fo_last_objids[i]);
- }
-
- if (filter->fo_subdir_count) {
- O_dentry = filter->fo_dentry_O_groups[0];
- OBD_ALLOC(filter->fo_dentry_O_sub,
- filter->fo_subdir_count * sizeof(dentry));
- if (filter->fo_dentry_O_sub == NULL)
- GOTO(cleanup, rc = -ENOMEM);
-
- for (i = 0; i < filter->fo_subdir_count; i++) {
- char dir[20];
- snprintf(dir, sizeof(dir), "d%u", i);
+ /* Group 0 is no longer a legal group, to catch uninitialized IDs */
+#define FILTER_MIN_GROUPS 3
+ rc = filter_read_groups(obd, FILTER_MIN_GROUPS, 1);
+ if (rc)
+ GOTO(cleanup, rc);
- dentry = simple_mkdir(O_dentry, dir, 0700, 1);
- CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("can't lookup/create O/0/%s: rc = %d\n",
- dir, rc);
- GOTO(cleanup, rc);
- }
- filter->fo_dentry_O_sub[i] = dentry;
- }
- }
RETURN(0);
cleanup:
- filter_cleanup_groups(obd);
+ switch (cleanup_phase) {
+ case 2:
+ filter_cleanup_groups(obd);
+ case 1:
+ f_dput(filter->fo_dentry_O);
+ filter->fo_dentry_O = NULL;
+ default:
+ break;
+ }
return rc;
}
/* setup the object store with correct subdirectories */
static int filter_prep(struct obd_device *obd)
{
- struct lvfs_run_ctxt saved;
+ struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
struct file *file;
struct inode *inode;
int rc = 0;
ENTRY;
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
if (!file || IS_ERR(file)) {
rc = PTR_ERR(file);
GOTO(err_server_data, rc);
out:
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
return(rc);
/* cleanup the filter: write last used object id to status file */
static void filter_post(struct obd_device *obd)
{
- struct lvfs_run_ctxt saved;
+ struct obd_run_ctxt saved;
struct filter_obd *filter = &obd->u.filter;
int rc, i;
* best to start a transaction with h_sync, because we removed this
* from lastobjid */
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
filter->fo_fsd, 0);
if (rc)
CERROR("error writing server data: rc = %d\n", rc);
- for (i = 0; i < FILTER_GROUPS; i++) {
- rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1));
+ for (i = 1; i < filter->fo_group_count; i++) {
+ rc = filter_update_last_objid(obd, i,
+ (i == filter->fo_group_count - 1));
if (rc)
CERROR("error writing group %d lastobjid: rc = %d\n",
i, rc);
}
- rc = filp_close(filter->fo_rcvd_filp, 0);
+ filp_close(filter->fo_rcvd_filp, 0);
filter->fo_rcvd_filp = NULL;
if (rc)
CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
filter_cleanup_groups(obd);
+ f_dput(filter->fo_dentry_O);
filter_free_server_data(filter);
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
}
-static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa,
- obd_id id)
+static void filter_set_last_id(struct filter_obd *filter, int group, obd_id id)
{
- obd_gr group = 0;
LASSERT(filter->fo_fsd != NULL);
-
- if (oa != NULL) {
- LASSERT(oa->o_gr <= FILTER_GROUPS);
- group = oa->o_gr;
- }
+ LASSERT(group > 0);
+ LASSERT(group < filter->fo_group_count);
spin_lock(&filter->fo_objidlock);
filter->fo_last_objids[group] = id;
spin_unlock(&filter->fo_objidlock);
}
-__u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
+__u64 filter_last_id(struct filter_obd *filter, int group)
{
obd_id id;
- obd_gr group = 0;
LASSERT(filter->fo_fsd != NULL);
+ LASSERT(group > 0);
+ LASSERT(group < filter->fo_group_count);
- if (oa != NULL) {
- LASSERT(oa->o_gr <= FILTER_GROUPS);
- group = oa->o_gr;
- }
-
- /* FIXME: object groups */
spin_lock(&filter->fo_objidlock);
id = filter->fo_last_objids[group];
spin_unlock(&filter->fo_objidlock);
struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
{
struct filter_obd *filter = &obd->u.filter;
- LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */
+ LASSERT(group < filter->fo_group_count);
+ LASSERT(group > 0);
- if (group > 0 || filter->fo_subdir_count == 0)
- return filter->fo_dentry_O_groups[group];
+ if (/*group > 0 || */filter->fo_subdir_count == 0)
+ return filter->fo_groups[group];
- return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
+ return filter->fo_subdirs[group].dentry[objid & (filter->fo_subdir_count - 1)];
}
/* We never dget the object parent, so DON'T dput it either */
if (IS_ERR(dparent))
return dparent;
+ LASSERT(dparent);
+ LASSERT(dparent->d_inode);
+
rc = filter_lock_dentry(obd, dparent);
if (time_after(jiffies, now + 15 * HZ))
CERROR("slow parent lock %lus\n", (jiffies - now) / HZ);
RETURN(dchild);
}
-static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
+static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
+ obd_id group)
{
struct lustre_handle lockh;
int flags = LDLM_AST_DISCARD_DATA, rc;
- struct ldlm_res_id res_id = { .name = { objid } };
+ struct ldlm_res_id res_id = { .name = { objid, 0, group, 0 } };
ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
ENTRY;
res_lvb = res->lr_lvb_data;
LASSERT(res_lvb != NULL);
reply_lvb->lvb_size = res_lvb->lvb_size;
- reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
up(&res->lr_lvb_sem);
list_for_each(tmp, &res->lr_granted) {
if (tmplock->l_granted_mode == LCK_PR)
continue;
- if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
+ if (tmplock->l_policy_data.l_extent.end <=
+ reply_lvb->lvb_size)
continue;
if (l == NULL) {
down(&res->lr_lvb_sem);
reply_lvb->lvb_size = res_lvb->lvb_size;
- reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
up(&res->lr_lvb_sem);
LDLM_LOCK_PUT(l);
}
/* mount the file system (secretly) */
-int filter_common_setup(struct obd_device *obd, obd_count len,
- void *buf, char *option)
+int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
+ char *option)
{
struct lustre_cfg* lcfg = buf;
struct filter_obd *filter = &obd->u.filter;
struct vfsmount *mnt;
- char name[32] = "CATLIST";
int rc = 0;
ENTRY;
filter->fo_fstype = mnt->mnt_sb->s_type->name;
CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
- OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
- obd->obd_lvfs_ctxt.pwdmnt = mnt;
- obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
- obd->obd_lvfs_ctxt.fs = get_ds();
- obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
-
- rc = fsfilt_setup(obd, mnt->mnt_sb);
- if (rc)
- GOTO(err_mntput, rc);
+ OBD_SET_CTXT_MAGIC(&obd->obd_ctxt);
+ obd->obd_ctxt.pwdmnt = mnt;
+ obd->obd_ctxt.pwd = mnt->mnt_root;
+ obd->obd_ctxt.fs = get_ds();
+ obd->obd_ctxt.cb_ops = filter_lvfs_ops;
rc = filter_prep(obd);
if (rc)
GOTO(err_mntput, rc);
-
- filter->fo_destroy_in_progress = 0;
- sema_init(&filter->fo_create_lock, 1);
-
spin_lock_init(&filter->fo_translock);
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
spin_lock_init(&filter->fo_w_discont_blocks.oh_lock);
filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
+ INIT_LIST_HEAD(&filter->fo_llog_list);
+ spin_lock_init(&filter->fo_llog_list_lock);
+
obd->obd_namespace = ldlm_namespace_new("filter-tgt",
LDLM_NAMESPACE_SERVER);
if (obd->obd_namespace == NULL)
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
- rc = obd_llog_cat_initialize(obd, 1, name);
+ rc = llog_cat_initialize(obd, &obd->obd_llogs, 1);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_post, rc);
static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
{
- struct lprocfs_static_vars lvars;
struct lustre_cfg* lcfg = buf;
+ const char *str = NULL;
+ char *option = NULL;
+ int n = 0;
int rc;
- /* all mount options including errors=remount-ro and asyncdel are passed
- * using 4th lcfg param. And it is good, finally we have got rid of
- * hardcoded fs types in the code. */
-
- rc = filter_common_setup(obd, len, buf, lcfg->lcfg_inlbuf4);
-
- lprocfs_init_vars(filter, &lvars);
- if (rc == 0 && lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
- lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) {
- /* Init obdfilter private stats here */
- lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
- LPROCFS_CNTR_AVGMINMAX,
- "read_bytes", "bytes");
- lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
- LPROCFS_CNTR_AVGMINMAX,
- "write_bytes", "bytes");
-
- lproc_filter_attach_seqstat(obd);
+ if (!strcmp(lcfg->lcfg_inlbuf2, "ext3")) {
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ /* bug 1577: implement async-delete for 2.5 */
+ str = "errors=remount-ro,asyncdel";
+#else
+ str = "errors=remount-ro";
+#endif
+ n = strlen(str) + 1;
+ OBD_ALLOC(option, n);
+ if (option == NULL)
+ RETURN(-ENOMEM);
+ strcpy(option, str);
}
+ rc = filter_common_setup(obd, len, buf, option);
+ if (option)
+ OBD_FREE(option, n);
return rc;
}
if (filter->fo_sb == NULL)
RETURN(0);
- lprocfs_free_obd_stats(obd);
- lprocfs_obd_cleanup(obd);
-
filter_post(obd);
shrink_dcache_parent(filter->fo_sb->s_root);
RETURN(0);
}
+static int filter_attach(struct obd_device *obd, obd_count len, void *data)
+{
+ struct lprocfs_static_vars lvars;
+ int rc;
+
+ lprocfs_init_vars(filter, &lvars);
+ rc = lprocfs_obd_attach(obd, lvars.obd_vars);
+ if (rc != 0)
+ return rc;
+
+ rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
+ if (rc != 0)
+ return rc;
+
+ /* Init obdfilter private stats here */
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
+ LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+ LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
+
+ return lproc_filter_attach_seqstat(obd);
+}
+
+static int filter_detach(struct obd_device *dev)
+{
+ lprocfs_free_obd_stats(dev);
+ return lprocfs_obd_detach(dev);
+}
+
/* nearly identical to mds_connect */
static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
struct obd_uuid *cluuid)
static int filter_precleanup(struct obd_device *obd, int flags)
{
+ struct filter_group_llog *log;
+ struct filter_obd *filter;
int rc = 0;
ENTRY;
- rc = obd_llog_finish(obd, 0);
+ filter = &obd->u.filter;
+
+ spin_lock(&filter->fo_llog_list_lock);
+ while (!list_empty(&filter->fo_llog_list)) {
+ log = list_entry(filter->fo_llog_list.next,
+ struct filter_group_llog, list);
+ list_del(&log->list);
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ rc = obd_llog_finish(obd, log->llogs, 0);
+ if (rc)
+ CERROR("failed to cleanup llogging subsystem for %u\n",
+ log->group);
+ OBD_FREE(log->llogs, sizeof(*(log->llogs)));
+ OBD_FREE(log, sizeof(*log));
+ spin_lock(&filter->fo_llog_list_lock);
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ rc = obd_llog_finish(obd, &obd->obd_llogs, 0);
if (rc)
CERROR("failed to cleanup llogging subsystem\n");
/* Do extra sanity checks for grant accounting. We do this at connect,
* disconnect, and statfs RPC time, so it shouldn't be too bad. We can
* always get rid of it or turn it off when we know accounting is good. */
-static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
+static void filter_grant_sanity_check(struct obd_device *obd, char *func)
{
struct filter_export_data *fed;
struct obd_export *exp;
RETURN(0);
}
+static void filter_sync_llogs(struct obd_export *dexp)
+{
+ struct filter_group_llog *fglog, *nlog;
+ struct obd_device *obd = dexp->exp_obd;
+ struct filter_obd *filter;
+ int worked = 0, group;
+ struct llog_ctxt *ctxt;
+
+ filter = &obd->u.filter;
+
+ /* we can't sync log holding spinlock. also, we do not want to get
+ * into livelock. so we do following: loop over MDS's exports in
+ * group order and skip already synced llogs -bzzz */
+ do {
+ /* look for group with min. number, but > worked */
+ fglog = NULL;
+ group = 1 << 30;
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each_entry(nlog, &filter->fo_llog_list, list) {
+
+ if (nlog->group <= worked) {
+ /* this group is already synced */
+ continue;
+ }
+
+ if (group < nlog->group) {
+ /* we have group with smaller number to sync */
+ continue;
+ }
+
+ /* store current minimal group */
+ fglog = nlog;
+ group = nlog->group;
+ }
+ spin_lock(&filter->fo_llog_list_lock);
+
+ if (fglog) {
+ worked = fglog->group;
+ ctxt = llog_get_context(fglog->llogs,
+ LLOG_UNLINK_REPL_CTXT);
+ llog_sync(ctxt, dexp);
+ }
+ } while (fglog != NULL);
+}
+
/* also incredibly similar to mds_disconnect */
static int filter_disconnect(struct obd_export *exp, int flags)
{
struct obd_device *obd = exp->exp_obd;
unsigned long irqflags;
- struct llog_ctxt *ctxt;
int rc;
ENTRY;
fsfilt_sync(obd, obd->u.filter.fo_sb);
/* flush any remaining cancel messages out to the target */
- ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
- llog_sync(ctxt, exp);
+ filter_sync_llogs(exp);
class_export_put(exp);
RETURN(rc);
}
if (dchild->d_inode == NULL) {
- CERROR("%s: %s on non-existent object: "LPU64"\n",
- obd->obd_name, what, oa->o_id);
+ CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
f_dput(dchild);
RETURN(ERR_PTR(-ENOENT));
}
static int filter_setattr(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *md, struct obd_trans_info *oti)
{
- struct lvfs_run_ctxt saved;
+ struct obd_run_ctxt saved;
struct filter_obd *filter;
struct dentry *dentry;
struct iattr iattr;
- struct ldlm_res_id res_id = { .name = { oa->o_id } };
+ struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
struct ldlm_resource *res;
void *handle;
int rc, rc2;
iattr_from_obdo(&iattr, oa, oa->o_valid);
- push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
lock_kernel();
if (iattr.ia_valid & ATTR_SIZE)
if (iattr.ia_valid & ATTR_SIZE)
up(&dentry->d_inode->i_sem);
unlock_kernel();
- pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
f_dput(dentry);
RETURN(rc);
LASSERT(oa);
memset(&doa, 0, sizeof(doa));
- if (oa->o_valid & OBD_MD_FLGROUP) {
- doa.o_valid |= OBD_MD_FLGROUP;
- doa.o_gr = oa->o_gr;
- } else {
- doa.o_gr = 0;
- }
doa.o_mode = S_IFREG;
-
- filter->fo_destroy_in_progress = 1;
- down(&filter->fo_create_lock);
- if (!filter->fo_destroy_in_progress) {
- CERROR("%s: destroy_in_progress already cleared\n",
- exp->exp_obd->obd_name);
- up(&filter->fo_create_lock);
- EXIT;
- return;
- }
-
- last = filter_last_id(filter, &doa);
- CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
- exp->exp_obd->obd_name, oa->o_id + 1, last);
+ doa.o_gr = oa->o_gr;
+ doa.o_valid = oa->o_valid & OBD_MD_FLGROUP;
+ last = filter_last_id(filter, oa->o_gr);
+ CWARN("deleting orphan objects from "LPU64" to "LPU64"\n",
+ oa->o_id + 1, last);
for (id = oa->o_id + 1; id <= last; id++) {
doa.o_id = id;
filter_destroy(exp, &doa, NULL, NULL);
}
-
- CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
- exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
-
spin_lock(&filter->fo_objidlock);
filter->fo_last_objids[doa.o_gr] = oa->o_id;
spin_unlock(&filter->fo_objidlock);
-
- filter->fo_destroy_in_progress = 0;
- up(&filter->fo_create_lock);
-
EXIT;
}
/* returns a negative error or a nonnegative number of files to create */
static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
- obd_gr group)
+ int group)
{
struct obd_device *obd = exp->exp_obd;
struct filter_obd *filter = &obd->u.filter;
int diff, rc;
ENTRY;
- diff = oa->o_id - filter_last_id(filter, oa);
+ diff = oa->o_id - filter_last_id(filter, oa->o_gr);
CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
- filter_last_id(filter, oa), diff);
+ filter_last_id(filter, oa->o_gr), diff);
/* delete orphans request */
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
if (-diff > 10000) { /* XXX make this smarter */
CERROR("ignoring bogus orphan destroy request: obdid "
LPU64" last_id "LPU64"\n",
- oa->o_id, filter_last_id(filter, oa));
+ oa->o_id, filter_last_id(filter, oa->o_gr));
RETURN(-EINVAL);
}
filter_destroy_precreated(exp, oa, filter);
static int filter_precreate(struct obd_device *obd, struct obdo *oa,
obd_gr group, int *num)
{
- struct dentry *dchild = NULL, *dparent = NULL;
+ struct dentry *dchild = NULL;
struct filter_obd *filter;
- int err = 0, rc = 0, recreate_obj = 0, i;
+ struct dentry *dparent;
+ int err = 0, rc = 0, i;
__u64 next_id;
+ int recreate_obj = 0;
void *handle = NULL;
ENTRY;
recreate_obj = 1;
}
- CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
-
- down(&filter->fo_create_lock);
-
for (i = 0; i < *num && err == 0; i++) {
int cleanup_phase = 0;
- if (filter->fo_destroy_in_progress) {
- CWARN("%s: precreate aborted by destroy\n",
- obd->obd_name);
- break;
- }
-
if (recreate_obj) {
__u64 last_id;
next_id = oa->o_id;
- last_id = filter_last_id(filter, oa);
+ last_id = filter_last_id(filter, group);
if (next_id > last_id) {
CERROR("Error: Trying to recreate obj greater"
"than last id "LPD64" > "LPD64"\n",
next_id, last_id);
- GOTO(cleanup, rc = -EINVAL);
+ RETURN(-EINVAL);
}
- } else
- next_id = filter_last_id(filter, oa) + 1;
+ } else {
+ next_id = filter_last_id(filter, group) + 1;
+ }
CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
* already exists
*/
if (recreate_obj) {
- CERROR("%s: Serious error: recreating obj %*s "
- "but obj already exists \n",
- obd->obd_name, dchild->d_name.len,
- dchild->d_name.name);
- LBUG();
+ CERROR("Serious error: recreating obj %*s but "
+ "obj already exists \n",
+ dchild->d_name.len, dchild->d_name.name);
} else {
- CERROR("%s: Serious error: objid %*s already "
+ CERROR("Serious error: objid %*s already "
"exists; is this filesystem corrupt?\n",
- obd->obd_name, dchild->d_name.len,
- dchild->d_name.name);
- LBUG();
+ dchild->d_name.len, dchild->d_name.name);
}
GOTO(cleanup, rc = -EEXIST);
}
}
if (!recreate_obj) {
- filter_set_last_id(filter, oa, next_id);
+ filter_set_last_id(filter, group, next_id);
err = filter_update_last_objid(obd, group, 0);
if (err)
CERROR("unable to write lastobjid "
}
*num = i;
- up(&filter->fo_create_lock);
-
- CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
- obd->obd_name, group, filter->fo_last_objids[group]);
-
- CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
- obd->obd_name, i);
+ CDEBUG(D_INFO, "filter_precreate() created %d objects\n", i);
RETURN(rc);
}
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
struct obd_device *obd = NULL;
- struct lvfs_run_ctxt saved;
+ struct filter_obd *filter;
+ struct obd_run_ctxt saved;
struct lov_stripe_md *lsm = NULL;
- obd_gr group = 0;
+ struct filter_export_data *fed;
+ int group = oa->o_gr;
+ char str[PTL_NALFMT_SIZE];
int rc = 0, diff;
ENTRY;
- if (oa->o_valid & OBD_MD_FLGROUP)
- group = oa->o_gr;
+ if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
+ portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+ exp->exp_connection->c_peer.peer_nid, str);
+ CERROR("!!! nid "LPX64"/%s sent invalid object group %d\n",
+ exp->exp_connection->c_peer.peer_nid, str, group);
+ RETURN(-EINVAL);
+ }
+
+ obd = exp->exp_obd;
+ fed = &exp->exp_filter_data;
+ filter = &obd->u.filter;
+
+ if (fed->fed_group != group) {
+ portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+ exp->exp_connection->c_peer.peer_nid, str);
+ CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+ "earlier; now it's trying to use group %d! This could "
+ "be a bug in the MDS. Tell CFS.\n",
+ exp->exp_connection->c_peer.peer_nid, str,
+ fed->fed_group, group);
+ RETURN(-ENOTUNIQ);
+ }
- CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
+ CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
group, oa->o_id);
if (ea != NULL) {
lsm = *ea;
}
}
- obd = exp->exp_obd;
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
(oa->o_flags & OBD_FL_RECREATE_OBJS)) {
- if (oa->o_id > filter_last_id(&obd->u.filter, oa)) {
+ if (oa->o_id > filter_last_id(&obd->u.filter, group)) {
CERROR("recreate objid "LPU64" > last id "LPU64"\n",
- oa->o_id, filter_last_id(&obd->u.filter, oa));
+ oa->o_id, filter_last_id(&obd->u.filter, group));
rc = -EINVAL;
} else {
diff = 1;
} else {
diff = filter_should_precreate(exp, oa, group);
if (diff > 0) {
- oa->o_id = filter_last_id(&obd->u.filter, oa);
+ oa->o_id = filter_last_id(&obd->u.filter, group);
rc = filter_precreate(obd, oa, group, &diff);
oa->o_id += diff;
oa->o_valid = OBD_MD_FLID;
}
}
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
if (rc && ea != NULL && *ea != lsm) {
obd_free_memmd(exp, &lsm);
} else if (rc == 0 && ea != NULL) {
* don't go touching that. This needs to be fixed in a
* big way. */
lsm->lsm_object_id = oa->o_id;
+ lsm->lsm_object_gr = oa->o_gr;
*ea = lsm;
}
struct obd_device *obd;
struct filter_obd *filter;
struct dentry *dchild = NULL, *dparent = NULL;
- struct lvfs_run_ctxt saved;
+ struct obd_run_ctxt saved;
void *handle = NULL;
struct llog_cookie *fcc = NULL;
int rc, rc2, cleanup_phase = 0, have_prepared = 0;
- obd_gr group = 0;
ENTRY;
- if (oa->o_valid & OBD_MD_FLGROUP)
- group = oa->o_gr;
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
obd = exp->exp_obd;
filter = &obd->u.filter;
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
acquire_locks:
- dparent = filter_parent_lock(obd, group, oa->o_id);
+ dparent = filter_parent_lock(obd, oa->o_gr, oa->o_id);
if (IS_ERR(dparent))
GOTO(cleanup, rc = PTR_ERR(dparent));
cleanup_phase = 1;
- dchild = filter_fid2dentry(obd, dparent, group, oa->o_id);
+ dchild = filter_fid2dentry(obd, dparent, oa->o_gr, oa->o_id);
if (IS_ERR(dchild))
GOTO(cleanup, rc = -ENOENT);
cleanup_phase = 2;
f_dput(dchild);
filter_parent_unlock(dparent);
- filter_prepare_destroy(obd, oa->o_id);
+ filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
have_prepared = 1;
goto acquire_locks;
}
- handle = fsfilt_start_log(obd, dparent->d_inode, FSFILT_OP_UNLINK, oti, 1);
+ handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1);
if (IS_ERR(handle))
GOTO(cleanup, rc = PTR_ERR(handle));
-
cleanup_phase = 3;
/* Our MDC connection is established by the MDS to us */
case 3:
if (fcc != NULL) {
if (oti != NULL)
- fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
- oti->oti_handle,
+ fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
filter_cancel_cookies_cb,
fcc);
else
- fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
- handle,
+ fsfilt_add_journal_cb(obd, 0, handle,
filter_cancel_cookies_cb,
fcc);
}
case 1:
filter_parent_unlock(dparent);
case 0:
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
break;
default:
CERROR("invalid cleanup_phase %d\n", cleanup_phase);
static int filter_sync(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *lsm, obd_off start, obd_off end)
{
- struct lvfs_run_ctxt saved;
+ struct obd_device *obd = exp->exp_obd;
+ struct obd_run_ctxt saved;
struct filter_obd *filter;
struct dentry *dentry;
struct llog_ctxt *ctxt;
int rc, rc2;
ENTRY;
- filter = &exp->exp_obd->u.filter;
+ filter = &obd->u.filter;
/* an objid of zero is taken to mean "sync whole filesystem" */
if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
- rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
+ rc = fsfilt_sync(obd, filter->fo_sb);
/* flush any remaining cancel messages out to the target */
- ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+ ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_REPL_CTXT);
llog_sync(ctxt, exp);
RETURN(rc);
}
- dentry = filter_oa2dentry(exp->exp_obd, oa);
+ dentry = filter_oa2dentry(obd, oa);
if (IS_ERR(dentry))
RETURN(PTR_ERR(dentry));
- push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
down(&dentry->d_inode->i_sem);
rc = filemap_fdatasync(dentry->d_inode->i_mapping);
oa->o_valid = OBD_MD_FLID;
obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
- pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
f_dput(dentry);
RETURN(rc);
static int filter_get_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 *vallen, void *val)
{
+ struct filter_export_data *fed = &exp->exp_filter_data;
struct obd_device *obd;
ENTRY;
if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) {
obd_id *last_id = val;
- /* FIXME: object groups */
- *last_id = filter_last_id(&obd->u.filter, 0);
+ *last_id = filter_last_id(&obd->u.filter, fed->fed_group);
RETURN(0);
}
CDEBUG(D_IOCTL, "invalid key\n");
RETURN(-EINVAL);
}
+struct obd_llogs * filter_grab_llog_for_group(struct obd_device *obd, int group)
+{
+ struct filter_group_llog *fglog, *nlog;
+ struct filter_obd *filter;
+ struct list_head *cur;
+ int rc;
+
+ filter = &obd->u.filter;
+
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each(cur, &filter->fo_llog_list) {
+ fglog = list_entry(cur, struct filter_group_llog, list);
+ if (fglog->group == group) {
+ spin_unlock(&filter->fo_llog_list_lock);
+ RETURN(fglog->llogs);
+ }
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ OBD_ALLOC(fglog, sizeof(*fglog));
+ if (fglog == NULL)
+ RETURN(NULL);
+ fglog->group = group;
+
+ OBD_ALLOC(fglog->llogs, sizeof(struct obd_llogs));
+ if (fglog->llogs == NULL) {
+ OBD_FREE(fglog, sizeof(*fglog));
+ RETURN(NULL);
+ }
+
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each(cur, &filter->fo_llog_list) {
+ nlog = list_entry(cur, struct filter_group_llog, list);
+ LASSERT(nlog->group != group);
+ }
+ list_add(&fglog->list, &filter->fo_llog_list);
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ rc = llog_cat_initialize(obd, fglog->llogs, 1);
+ if (rc) {
+ OBD_FREE(fglog->llogs, sizeof(*(fglog->llogs)));
+ OBD_FREE(fglog, sizeof(*fglog));
+ RETURN(NULL);
+ }
+
+ CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n", obd->obd_name,
+ fglog->llogs, group);
+
+ RETURN(fglog->llogs);
+}
+
static int filter_set_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 vallen, void *val)
{
+ struct obd_run_ctxt saved;
+ struct filter_export_data *fed = &exp->exp_filter_data;
struct obd_device *obd;
struct lustre_handle conn;
+ struct obd_llogs *llog;
struct llog_ctxt *ctxt;
+ __u32 group;
int rc = 0;
ENTRY;
memcmp(key, "mds_conn", keylen) != 0)
RETURN(-EINVAL);
- CWARN("Received MDS connection ("LPX64")\n", conn.cookie);
- memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn));
- ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
+ group = *((__u32 *)val);
+ if (fed->fed_group != 0 && fed->fed_group != group) {
+ char str[PTL_NALFMT_SIZE];
+ portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+ exp->exp_connection->c_peer.peer_nid, str);
+ CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+ "earlier; now it's trying to use group %d! This could "
+ "be a bug in the MDS. Tell CFS.\n",
+ exp->exp_connection->c_peer.peer_nid, str,
+ fed->fed_group, group);
+ RETURN(-EPROTO);
+ }
+ fed->fed_group = group;
+ CWARN("Received MDS connection ("LPX64"); group %d\n", conn.cookie,
+ group);
+
+ LASSERT(rc == 0);
+
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
+ rc = filter_read_groups(obd, group, 1);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+ if (rc != 0) {
+ CERROR("can't read group %u\n", group);
+ RETURN(rc);
+ }
+
+ llog = filter_grab_llog_for_group(obd, group);
+ LASSERT(llog != NULL);
+
+ ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
+ LASSERT(ctxt != NULL);
rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
RETURN(rc);
}
}
case OBD_IOC_CATLOGLIST: {
- rc = llog_catalog_list(obd, 1, data);
+ rc = llog_catlog_list(obd, 1, data);
RETURN(rc);
}
/*
struct llog_ctxt *ctxt = NULL;
- push_ctxt(&saved, &ctxt->loc_ctxt, NULL);
+ push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
- pop_ctxt(&saved, &ctxt->loc_ctxt, NULL);
+ pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
RETURN(rc);
*/
static struct llog_operations filter_unlink_repl_logops;
static struct llog_operations filter_size_orig_logops = {
lop_setup: llog_obd_origin_setup,
- lop_cleanup: llog_catalog_cleanup,
- lop_add: llog_catalog_add,
+ lop_cleanup: llog_obd_origin_cleanup,
+ lop_add: llog_obd_origin_add
};
-static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *catid)
+static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *logid)
{
struct llog_ctxt *ctxt;
int rc;
filter_unlink_repl_logops.lop_connect = llog_repl_connect;
filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync;
- rc = obd_llog_setup(obd, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
- &filter_unlink_repl_logops);
+ rc = llog_setup(obd, llogs, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
+ &filter_unlink_repl_logops);
if (rc)
RETURN(rc);
/* FIXME - assign unlink_cb for filter's recovery */
- ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
- ctxt->loc_proc_cb = filter_recov_log_unlink_cb;
+ ctxt = llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT);
+ ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
- /* FIXME - count should be 1 to setup size log */
- rc = obd_llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, &catid->lci_logid,
- &filter_size_orig_logops);
+ rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+ &filter_size_orig_logops);
RETURN(rc);
}
-static int filter_llog_finish(struct obd_device *obd, int count)
+static int filter_llog_finish(struct obd_device *obd,
+ struct obd_llogs *llogs, int count)
{
int rc;
ENTRY;
- rc = obd_llog_cleanup(llog_get_context(obd, LLOG_UNLINK_REPL_CTXT));
+ rc = llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT));
if (rc)
RETURN(rc);
- rc = obd_llog_cleanup(llog_get_context(obd, LLOG_SIZE_ORIG_CTXT));
+ rc = llog_cleanup(llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT));
+ RETURN(rc);
+}
+
+static int filter_llog_connect(struct obd_device *obd,
+ struct llogd_conn_body *body)
+{
+ struct llog_ctxt *ctxt;
+ struct obd_llogs *llog;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_OTHER, "handle connect for %s: %u/%u/%u\n", obd->obd_name,
+ (unsigned) body->lgdc_logid.lgl_ogr,
+ (unsigned) body->lgdc_logid.lgl_oid,
+ (unsigned) body->lgdc_logid.lgl_ogen);
+ llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr);
+ LASSERT(llog != NULL);
+ ctxt = llog_get_context(llog, body->lgdc_ctxt_idx);
+ rc = llog_connect(ctxt, 1, &body->lgdc_logid,
+ &body->lgdc_gen, NULL);
+ if (rc != 0)
+ CERROR("failed to connect\n");
+
RETURN(rc);
}
};
static struct obd_ops filter_obd_ops = {
- .o_owner = THIS_MODULE,
- .o_get_info = filter_get_info,
- .o_set_info = filter_set_info,
- .o_setup = filter_setup,
- .o_precleanup = filter_precleanup,
- .o_cleanup = filter_cleanup,
- .o_connect = filter_connect,
- .o_disconnect = filter_disconnect,
- .o_statfs = filter_statfs,
- .o_getattr = filter_getattr,
- .o_unpackmd = filter_unpackmd,
- .o_create = filter_create,
- .o_setattr = filter_setattr,
- .o_destroy = filter_destroy,
- .o_brw = filter_brw,
- .o_punch = filter_truncate,
- .o_sync = filter_sync,
- .o_preprw = filter_preprw,
- .o_commitrw = filter_commitrw,
- .o_destroy_export = filter_destroy_export,
- .o_llog_init = filter_llog_init,
- .o_llog_finish = filter_llog_finish,
- .o_iocontrol = filter_iocontrol,
+ o_owner: THIS_MODULE,
+ o_attach: filter_attach,
+ o_detach: filter_detach,
+ o_get_info: filter_get_info,
+ o_set_info: filter_set_info,
+ o_setup: filter_setup,
+ o_precleanup: filter_precleanup,
+ o_cleanup: filter_cleanup,
+ o_connect: filter_connect,
+ o_disconnect: filter_disconnect,
+ o_statfs: filter_statfs,
+ o_getattr: filter_getattr,
+ o_unpackmd: filter_unpackmd,
+ o_create: filter_create,
+ o_setattr: filter_setattr,
+ o_destroy: filter_destroy,
+ o_brw: filter_brw,
+ o_punch: filter_truncate,
+ o_sync: filter_sync,
+ o_preprw: filter_preprw,
+ o_commitrw: filter_commitrw,
+ o_destroy_export: filter_destroy_export,
+ o_llog_init: filter_llog_init,
+ o_llog_finish: filter_llog_finish,
+ o_llog_connect: filter_llog_connect,
+ o_iocontrol: filter_iocontrol,
};
static struct obd_ops filter_sanobd_ops = {
- .o_owner = THIS_MODULE,
- .o_get_info = filter_get_info,
- .o_set_info = filter_set_info,
- .o_setup = filter_san_setup,
- .o_precleanup = filter_precleanup,
- .o_cleanup = filter_cleanup,
- .o_connect = filter_connect,
- .o_disconnect = filter_disconnect,
- .o_statfs = filter_statfs,
- .o_getattr = filter_getattr,
- .o_unpackmd = filter_unpackmd,
- .o_create = filter_create,
- .o_setattr = filter_setattr,
- .o_destroy = filter_destroy,
- .o_brw = filter_brw,
- .o_punch = filter_truncate,
- .o_sync = filter_sync,
- .o_preprw = filter_preprw,
- .o_commitrw = filter_commitrw,
- .o_san_preprw = filter_san_preprw,
- .o_destroy_export = filter_destroy_export,
- .o_llog_init = filter_llog_init,
- .o_llog_finish = filter_llog_finish,
- .o_iocontrol = filter_iocontrol,
+ o_owner: THIS_MODULE,
+ o_attach: filter_attach,
+ o_detach: filter_detach,
+ o_get_info: filter_get_info,
+ o_set_info: filter_set_info,
+ o_setup: filter_san_setup,
+ o_precleanup: filter_precleanup,
+ o_cleanup: filter_cleanup,
+ o_connect: filter_connect,
+ o_disconnect: filter_disconnect,
+ o_statfs: filter_statfs,
+ o_getattr: filter_getattr,
+ o_unpackmd: filter_unpackmd,
+ o_create: filter_create,
+ o_setattr: filter_setattr,
+ o_destroy: filter_destroy,
+ o_brw: filter_brw,
+ o_punch: filter_truncate,
+ o_sync: filter_sync,
+ o_preprw: filter_preprw,
+ o_commitrw: filter_commitrw,
+ o_san_preprw: filter_san_preprw,
+ o_destroy_export: filter_destroy_export,
+ o_llog_init: filter_llog_init,
+ o_llog_finish: filter_llog_finish,
+ o_llog_connect: filter_llog_connect,
+ o_iocontrol: filter_iocontrol,
};
static int __init obdfilter_init(void)
lprocfs_init_vars(filter, &lvars);
- rc = class_register_type(&filter_obd_ops, lvars.module_vars,
+ rc = class_register_type(&filter_obd_ops, NULL, lvars.module_vars,
OBD_FILTER_DEVICENAME);
if (rc)
return rc;
- rc = class_register_type(&filter_sanobd_ops, lvars.module_vars,
+ rc = class_register_type(&filter_sanobd_ops, NULL, lvars.module_vars,
OBD_FILTER_SAN_DEVICENAME);
if (rc)
class_unregister_type(OBD_FILTER_DEVICENAME);