#define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
#define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
-static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
-
#define LAST_RCVD "last_rcvd"
/* Add client data to the MDS. We use a bitmap to locate a free space
* Otherwise, we have just read the data from the last_rcvd file and
* we know its offset.
*/
-int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
+int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
+ struct mds_export_data *med, int cl_off)
{
+ unsigned long *bitmap = mds->mds_client_bitmap;
int new_client = (cl_off == -1);
+ LASSERT(bitmap != NULL);
+
+ /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
+ if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
+ RETURN(0);
+
/* the bitmap operations can handle cl_off > sizeof(long) * 8, so
* there's no need for extra complication here
*/
if (new_client) {
- cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS);
+ cl_off = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
repeat:
if (cl_off >= MDS_MAX_CLIENTS) {
CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
return -ENOMEM;
}
- if (test_and_set_bit(cl_off, last_rcvd_slots)) {
+ if (test_and_set_bit(cl_off, bitmap)) {
CERROR("MDS client %d: found bit is set in bitmap\n",
cl_off);
- cl_off = find_next_zero_bit(last_rcvd_slots,
- MDS_MAX_CLIENTS, cl_off);
+ cl_off = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
+ cl_off);
goto repeat;
}
} else {
- if (test_and_set_bit(cl_off, last_rcvd_slots)) {
+ if (test_and_set_bit(cl_off, bitmap)) {
CERROR("MDS client %d: bit already set in bitmap!!\n",
cl_off);
LBUG();
struct obd_run_ctxt saved;
loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE);
ssize_t written;
+ void *handle;
push_ctxt(&saved, &mds->mds_ctxt, NULL);
- written = lustre_fwrite(mds->mds_rcvd_filp,
- (char *)med->med_mcd,
- sizeof(*med->med_mcd), &off);
+ /* We need to start a transaction here first, to avoid a
+ * possible ordering deadlock on last_rcvd->i_sem and the
+ * journal lock. In most places we start the journal handle
+ * first (because we do compound transactions), and then
+ * later do the write into last_rcvd, which gets i_sem.
+ *
+ * Without this transaction, clients connecting at the same
+ * time other MDS operations are ongoing get last_rcvd->i_sem
+ * first (in generic_file_write()) and start the journal
+ * transaction afterwards, and can deadlock with other ops.
+ *
+ * We use FSFILT_OP_SETATTR because it is smallest, but all
+ * ops include enough space for the last_rcvd update so we
+ * could use any of them, or maybe an FSFILT_OP_NONE is best?
+ */
+ handle = fsfilt_start(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
+ FSFILT_OP_SETATTR);
+ if (IS_ERR(handle)) {
+ written = PTR_ERR(handle);
+ CERROR("unable to start transaction: rc %d\n",
+ (int)written);
+ } else {
+ written = lustre_fwrite(mds->mds_rcvd_filp,med->med_mcd,
+ sizeof(*med->med_mcd), &off);
+ fsfilt_commit(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
+ handle, 0);
+ }
pop_ctxt(&saved, &mds->mds_ctxt, NULL);
if (written != sizeof(*med->med_mcd)) {
struct mds_client_data zero_mcd;
struct obd_run_ctxt saved;
int written;
+ unsigned long *bitmap = mds->mds_client_bitmap;
loff_t off;
+ LASSERT(bitmap);
if (!med->med_mcd)
RETURN(0);
+ /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
+ if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
+ GOTO(free_and_out, 0);
+
off = MDS_LR_CLIENT + (med->med_off * MDS_LR_SIZE);
CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
med->med_off, off, med->med_mcd->mcd_uuid);
- if (!test_and_clear_bit(med->med_off, last_rcvd_slots)) {
+ if (!test_and_clear_bit(med->med_off, bitmap)) {
CERROR("MDS client %u: bit already clear in bitmap!!\n",
med->med_off);
LBUG();
med->med_mcd->mcd_uuid, med->med_off);
}
+ free_and_out:
OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
return 0;
static int mds_server_free_data(struct mds_obd *mds)
{
+ OBD_FREE(mds->mds_client_bitmap,
+ MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
mds->mds_server_data = NULL;
__u64 last_transno = 0;
__u64 last_mount;
int rc = 0;
-
+
LASSERT(sizeof(struct mds_client_data) == MDS_LR_SIZE);
LASSERT(sizeof(struct mds_server_data) <= MDS_LR_CLIENT);
OBD_ALLOC(msd, sizeof(*msd));
if (!msd)
RETURN(-ENOMEM);
+
+ OBD_ALLOC(mds->mds_client_bitmap,
+ MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
+ if (!mds->mds_client_bitmap) {
+ OBD_FREE(msd, sizeof(*msd));
+ RETURN(-ENOMEM);
+ }
+
rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
mds->mds_server_data = msd;
if (rc == 0) {
- CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
+ CERROR("%s: empty MDS %s, new MDS?\n", obddev->obd_name,
+ LAST_RCVD);
RETURN(0);
}
sizeof exp->exp_client_uuid.uuid);
med = &exp->exp_mds_data;
med->med_mcd = mcd;
- mds_client_add(mds, med, cl_off);
+ mds_client_add(obddev, mds, med, cl_off);
/* create helper if export init gets more complex */
INIT_LIST_HEAD(&med->med_open_head);
spin_lock_init(&med->med_open_lock);
mcd = NULL;
obddev->obd_recoverable_clients++;
+ class_export_put(exp);
} else {
- CDEBUG(D_INFO,
- "discarded client %d, UUID '%s', count %Ld\n",
- cl_off, mcd->mcd_uuid,
- (long long)le64_to_cpu(mcd->mcd_mount_count));
+ CDEBUG(D_INFO, "discarded client %d, UUID '%s', count "
+ LPU64"\n", cl_off, mcd->mcd_uuid,
+ le64_to_cpu(mcd->mcd_mount_count));
}
- CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
+ CDEBUG(D_OTHER, "client at offset %d has last_transno = %Lu\n",
cl_off, (unsigned long long)last_transno);
if (last_transno > mds->mds_last_transno)
obddev->obd_recoverable_clients, mds->mds_last_transno);
obddev->obd_next_recovery_transno = obddev->obd_last_committed
+ 1;
- obddev->obd_flags |= OBD_RECOVERING;
+ obddev->obd_recovering = 1;
}
if (mcd)
dput(dentry);
- dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
- if (IS_ERR(dentry)) {
- rc = PTR_ERR(dentry);
- CERROR("cannot create FH directory: rc = %d\n", rc);
+ dentry = lookup_one_len("__iopen__", current->fs->pwd,
+ strlen("__iopen__"));
+ if (IS_ERR(dentry) || !dentry->d_inode) {
+ rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT;
+ CERROR("cannot open iopen FH directory: rc = %d\n", rc);
GOTO(err_pop, rc);
}
- /* XXX probably want to hold on to this later... */
- dput(dentry);
+ mds->mds_fid_de = dentry;
f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
if (IS_ERR(f)) {
return rc;
err_client:
- class_disconnect_all(obddev);
+ class_disconnect_exports(obddev, 0);
err_filp:
if (filp_close(f, 0))
CERROR("can't close %s after error\n", LAST_RCVD);
mds->mds_ctxt.pwdmnt = mnt;
mds->mds_ctxt.pwd = mnt->mnt_root;
mds->mds_ctxt.fs = get_ds();
-
RETURN(mds_fs_prep(obddev));
}
-int mds_fs_cleanup(struct obd_device *obddev)
+int mds_fs_cleanup(struct obd_device *obddev, int failover)
{
struct mds_obd *mds = &obddev->u.mds;
struct obd_run_ctxt saved;
int rc = 0;
- class_disconnect_all(obddev); /* this cleans up client info too */
+ if (failover)
+ CERROR("%s: shutting down for failover; client state will"
+ " be preserved.\n", obddev->obd_name);
+
+ class_disconnect_exports(obddev, failover); /* this cleans up client
+ info too */
mds_server_free_data(mds);
push_ctxt(&saved, &mds->mds_ctxt, NULL);
if (mds->mds_rcvd_filp) {
rc = filp_close(mds->mds_rcvd_filp, 0);
mds->mds_rcvd_filp = NULL;
-
if (rc)
CERROR("last_rcvd file won't close, rc=%d\n", rc);
}
pop_ctxt(&saved, &mds->mds_ctxt, NULL);
+ shrink_dcache_parent(mds->mds_fid_de);
+ dput(mds->mds_fid_de);
return rc;
}