#include <linux/module.h>
#include <linux/kmod.h>
#include <linux/lustre_mds.h>
+#include <linux/obd_class.h>
+#include <linux/obd_support.h>
+#include <linux/lustre_lib.h>
LIST_HEAD(mds_fs_types);
char *mft_name;
};
-/* This will be a hash table at some point. */
-static int mds_init_client_data(struct mds_obd *mds)
-{
- INIT_LIST_HEAD(&mds->mds_client_info);
- return 0;
-}
-
#define MDS_MAX_CLIENTS 1024
#define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
-/* Add client data to the MDS. The in-memory storage will be a hash at some
- * point. We use a bitmap to locate a free space in the last_rcvd file if
- * cl_off is -1 (i.e. a new client). Otherwise, we have just read the data
- * from the last_rcvd file and we know its offset.
+/* Add client data to the MDS. We use a bitmap to locate a free space
+ * in the last_rcvd file if cl_off is -1 (i.e. a new client).
+ * Otherwise, we have just read the data from the last_rcvd file and
+ * we know its offset.
*/
-int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
+int mds_client_add(struct mds_export_data *med, int cl_off)
{
- struct mds_client_info *mci;
-
- OBD_ALLOC(mci, sizeof(*mci));
- if (!mci) {
- CERROR("no memory for MDS client info\n");
- RETURN(-ENOMEM);
- }
- INIT_LIST_HEAD(&mci->mci_open_head);
-
- CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
- cl_off, mcd->mcd_uuid);
-
+ /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
+ * there's no need for extra complication here
+ */
if (cl_off == -1) {
- unsigned long *word;
- int bit;
-
+ cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS);
repeat:
- word = last_rcvd_slots;
- while(*word == ~0UL)
- ++word;
- if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
- CERROR("no room in client MDS bitmap - fix code\n");
+ if (cl_off >= MDS_MAX_CLIENTS) {
+ CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
return -ENOMEM;
}
- bit = ffz(*word);
- if (test_and_set_bit(bit, word)) {
- CERROR("found bit %d set for word %d - fix code\n",
- bit, word - last_rcvd_slots);
+ if (test_and_set_bit(cl_off, last_rcvd_slots)) {
+ CERROR("MDS client %d: found bit is set in bitmap\n",
+ cl_off);
+ cl_off = find_next_zero_bit(last_rcvd_slots,
+ MDS_MAX_CLIENTS, cl_off);
goto repeat;
}
- cl_off = word - last_rcvd_slots + bit;
} else {
if (test_and_set_bit(cl_off, last_rcvd_slots)) {
- CERROR("bit %d already set in bitmap - bad bad\n",
+ CERROR("MDS client %d: bit already set in bitmap!!\n",
cl_off);
LBUG();
}
}
- mci->mci_mcd = mcd;
- mci->mci_off = cl_off;
-
- /* For now we just put the clients in a list, not a hashed list */
- list_add_tail(&mci->mci_list, &mds->mds_client_info);
-
- mds->mds_client_count++;
+ CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
+ cl_off, med->med_mcd->mcd_uuid);
+ med->med_off = cl_off;
return 0;
}
-void mds_client_del(struct mds_obd *mds, struct mds_client_info *mci)
+int mds_client_free(struct obd_export *exp)
{
- unsigned long *word;
- int bit;
+ struct mds_export_data *med = &exp->exp_mds_data;
+
+ if (!med->med_mcd)
+ RETURN(0);
- word = last_rcvd_slots + mci->mci_off / sizeof(unsigned long);
- bit = mci->mci_off % sizeof(unsigned long);
+ CDEBUG(D_INFO, "freeing client at offset %d with UUID '%s'\n",
+ med->med_off, med->med_mcd->mcd_uuid);
- if (!test_and_clear_bit(bit, word)) {
- CERROR("bit %d already clear in word %d - bad bad\n",
- bit, word - last_rcvd_slots);
+ if (!test_and_clear_bit(med->med_off, last_rcvd_slots)) {
+ CERROR("MDS client %d: bit already clear in bitmap!!\n",
+ med->med_off);
LBUG();
}
- --mds->mds_client_count;
- list_del(&mci->mci_list);
- OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
- OBD_FREE(mci, sizeof (*mci));
-}
-
-static int mds_client_free_all(struct mds_obd *mds)
-{
- struct list_head *p, *n;
-
- list_for_each_safe(p, n, &mds->mds_client_info) {
- struct mds_client_info *mci;
-
- mci = list_entry(p, struct mds_client_info, mci_list);
- mds_client_del(mds, mci);
- }
+ OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
return 0;
}
#define LAST_RCVD "last_rcvd"
-static int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
+static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
{
+ struct mds_obd *mds = &obddev->u.mds;
struct mds_server_data *msd;
struct mds_client_data *mcd = NULL;
loff_t fsize = f->f_dentry->d_inode->i_size;
int cl_off;
__u64 last_rcvd = 0;
__u64 last_mount;
+ int clients = 0;
int rc = 0;
OBD_ALLOC(msd, sizeof(*msd));
if (rc == 0) {
CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
RETURN(0);
- } else if (rc != sizeof(*msd)) {
+ }
+
+ if (rc != sizeof(*msd)) {
CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
if (rc > 0) {
rc = -EIO;
for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
- if (!mcd)
+ if (!mcd) {
OBD_ALLOC(mcd, sizeof(*mcd));
- if (!mcd)
- GOTO(err_msd, rc = -ENOMEM);
+ if (!mcd)
+ GOTO(err_msd, rc = -ENOMEM);
+ }
rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
if (rc != sizeof(*mcd)) {
}
last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
- last_mount = le64_to_cpu(mcd->mcd_mount_count);
- if (last_rcvd &&
- last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
- rc = mds_client_add(mds, mcd, cl_off);
- if (rc) {
- rc = 0;
+ /* Do client recovery here (open files, etc) */
+ if (last_rcvd && (last_mount - le64_to_cpu(mcd->mcd_mount_count)
+ < MDS_MOUNT_RECOV)) {
+ struct obd_export *export = class_new_export(obddev);
+ if (!export) {
+ rc = -ENOMEM;
break;
}
+ export->exp_mds_data.med_mcd = mcd;
+ mds_client_add(&export->exp_mds_data, cl_off);
mcd = NULL;
+ clients++;
} else {
CDEBUG(D_INFO,
- "client at offset %d with UUID '%s' ignored\n",
- cl_off, mcd->mcd_uuid);
+ "ignored client %d, UUID '%s', last_mount %Ld\n",
+ cl_off, mcd->mcd_uuid,
+ (long long)le64_to_cpu(mcd->mcd_mount_count));
}
if (last_rcvd > mds->mds_last_rcvd) {
mds->mds_last_rcvd = last_rcvd;
}
}
- CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d clients\n",
- (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
+ CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d/%d clients\n",
+ (unsigned long long)mds->mds_last_rcvd, clients, cl_off);
+
+ if (mcd)
+ OBD_FREE(mcd, sizeof(*mcd));
/* After recovery, there can be no local uncommitted transactions */
mds->mds_last_committed = mds->mds_last_rcvd;
return rc;
}
-static int mds_fs_prep(struct mds_obd *mds)
+static int mds_fs_prep(struct obd_device *obddev)
{
+ struct mds_obd *mds = &obddev->u.mds;
struct obd_run_ctxt saved;
struct dentry *dentry;
struct file *f;
CERROR("cannot create ROOT directory: rc = %d\n", rc);
GOTO(err_pop, rc);
}
- /* XXX probably want to hold on to this later... */
- dput(dentry);
- f = filp_open("ROOT", O_RDONLY, 0);
- if (IS_ERR(f)) {
- rc = PTR_ERR(f);
- CERROR("cannot open ROOT: rc = %d\n", rc);
- LBUG();
- GOTO(err_pop, rc);
- }
- mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
- mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
+ mds->mds_rootfid.id = dentry->d_inode->i_ino;
+ mds->mds_rootfid.generation = dentry->d_inode->i_generation;
mds->mds_rootfid.f_type = S_IFDIR;
- rc = filp_close(f, 0);
- if (rc) {
- CERROR("cannot close ROOT: rc = %d\n", rc);
- LBUG();
- }
+ dput(dentry);
dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
if (IS_ERR(dentry)) {
/* XXX probably want to hold on to this later... */
dput(dentry);
- rc = mds_init_client_data(mds);
- if (rc)
- GOTO(err_pop, rc);
-
f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
if (IS_ERR(f)) {
rc = PTR_ERR(f);
GOTO(err_filp, rc);
}
- rc = mds_read_last_rcvd(mds, f);
+ rc = mds_read_last_rcvd(obddev, f);
if (rc) {
CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
GOTO(err_client, rc);
}
mds->mds_rcvd_filp = f;
+err_pop:
pop_ctxt(&saved);
- RETURN(0);
+ return rc;
err_client:
- mds_client_free_all(mds);
+ class_disconnect_all(obddev);
err_filp:
if (filp_close(f, 0))
CERROR("can't close %s after error\n", LAST_RCVD);
-err_pop:
- pop_ctxt(&saved);
-
- return rc;
+ goto err_pop;
}
static struct mds_fs_operations *mds_search_fs_type(const char *name)
__MOD_DEC_USE_COUNT(fs_ops->fs_owner);
}
-int mds_fs_setup(struct mds_obd *mds, struct vfsmount *mnt)
+int mds_fs_setup(struct obd_device *obddev, struct vfsmount *mnt)
{
+ struct mds_obd *mds = &obddev->u.mds;
int rc;
mds->mds_fsops = mds_fs_get_ops(mds->mds_fstype);
mds->mds_sop->delete_inode = mds->mds_fsops->fs_delete_inode;
mds->mds_sb->s_op = mds->mds_sop;
- rc = mds_fs_prep(mds);
+ rc = mds_fs_prep(obddev);
if (rc)
GOTO(out_free, rc);
return rc;
}
-void mds_fs_cleanup(struct mds_obd *mds)
+void mds_fs_cleanup(struct obd_device *obddev)
{
- mds_client_free_all(mds);
+ struct mds_obd *mds = &obddev->u.mds;
+
+ class_disconnect_all(obddev); /* this cleans up client info too */
mds_server_free_data(mds);
OBD_FREE(mds->mds_sop, sizeof(*mds->mds_sop));