return rc;
}
-/* 'dir' is a inode for which a lock has already been taken */
+/*
+ * Look up a named entry in a directory, and get an LDLM lock on it.
+ * 'dir' is a inode for which an LDLM lock has already been taken.
+ *
+ * If we do not need an exclusive or write lock on this entry (e.g.
+ * a read lock for attribute lookup only) then we do not hold the
+ * directory on return. It is up to the caller to know what type
+ * of lock it is getting, and clean up appropriately.
+ */
struct dentry *mds_name2locked_dentry(struct obd_device *obd,
struct dentry *dir, struct vfsmount **mnt,
char *name, int namelen, int lock_mode,
RETURN(retval);
}
+/* Look up an entry by inode number. */
struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
struct vfsmount **mnt)
{
return result;
}
+/* Establish a connection to the MDS.
+ *
+ * This will set up an export structure for the client to hold state data
+ * about that client, like open files, the last operation number it did
+ * on the server, etc.
+ */
static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
char *cluuid)
{
+ struct obd_export *exp;
+ struct mds_client_data *mcd;
int rc;
- struct list_head *p, *n;
MOD_INC_USE_COUNT;
if (cluuid) {
- list_for_each_safe(p, n, &obd->obd_exports) {
- struct obd_export *exp;
- struct mds_client_data *mcd;
-
+ struct list_head *p;
+ list_for_each(p, &obd->obd_exports) {
exp = list_entry(p, struct obd_export, exp_chain);
mcd = exp->exp_mds_data.med_mcd;
if (mcd && !memcmp(cluuid, mcd->mcd_uuid,
sizeof(mcd->mcd_uuid))) {
- CDEBUG(D_INFO,
+ CDEBUG(D_INFO,
"existing export for UUID '%s' at %p\n",
cluuid, exp);
- if (exp->exp_obd != obd)
- LBUG();
+ LASSERT(exp->exp_obd == obd);
exp->exp_rconnh.addr = conn->addr;
exp->exp_rconnh.cookie = conn->cookie;
conn->addr = (__u64) (unsigned long)exp;
conn->cookie = exp->exp_cookie;
- CDEBUG(D_IOCTL, "connect: addr %Lx cookie %Lx\n",
+ CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
(long long)conn->addr,
(long long)conn->cookie);
- return 0;
+ RETURN(0);
}
}
}
+#warning shaver: we might need a real cluuid here
rc = class_connect(conn, obd, NULL);
+ if (rc)
+ GOTO(out_dec, rc);
+ exp = class_conn2export(conn);
+ LASSERT(exp);
+ OBD_ALLOC(mcd, sizeof(*mcd));
+ if (!mcd) {
+ CERROR("mds: out of memory for client data\n");
+ GOTO(out_export, rc = -ENOMEM);
+ }
+ memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
+ exp->exp_mds_data.med_mcd = mcd;
+ rc = mds_client_add(&exp->exp_mds_data, -1);
if (rc)
- MOD_DEC_USE_COUNT;
+ GOTO(out_mdc, rc);
- return rc;
+ return 0;
+
+out_mdc:
+ OBD_FREE(mcd, sizeof(*mcd));
+out_export:
+ class_disconnect(conn);
+out_dec:
+ MOD_DEC_USE_COUNT;
+
+ RETURN(rc);
}
static int mds_disconnect(struct lustre_handle *conn)
{
+ struct obd_export *exp;
int rc;
+ exp = class_conn2export(conn);
+ if (!exp)
+ RETURN(-EINVAL);
+
+ rc = mds_client_free(&exp->exp_mds_data);
+ if (rc)
+ CERROR("error freeing client data: rc = %d\n", rc);
+
rc = class_disconnect(conn);
if (!rc)
MOD_DEC_USE_COUNT;
return rc;
}
-/* FIXME: the error cases need fixing to avoid leaks */
static int mds_getstatus(struct ptlrpc_request *req)
{
struct mds_obd *mds = mds_req2mds(req);
struct mds_body *body;
- struct mds_client_data *mcd;
struct mds_export_data *med = &req->rq_export->exp_mds_data;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
- mcd = med->med_mcd;
- if (!mcd) {
- int rc;
-
- CDEBUG(D_INFO, "allocating new client data for UUID '%s'\n",
- ptlrpc_req_to_uuid(req));
- OBD_ALLOC(mcd, sizeof(*mcd));
- if (!mcd) {
- CERROR("mds: out of memory for client data\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
- }
- memcpy(mcd->mcd_uuid, ptlrpc_req_to_uuid(req),
- sizeof(mcd->mcd_uuid));
- rc = mds_client_add(mds, med, -1);
- if (rc) {
- req->rq_status = rc;
- OBD_FREE(mcd, sizeof(*mcd));
- RETURN(0);
- }
- med->med_mcd = mcd;
- } else {
- CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
- mcd->mcd_uuid, med->med_off);
- }
+ LASSERT(med->med_mcd);
+
/* mcd_last_xid is is stored in little endian on the disk and
mds_pack_rep_body converts it to network order */
- body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
+ body->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
mds_pack_rep_body(req);
RETURN(0);
}
{
struct mds_obd *mds = mds_req2mds(req);
struct mds_status_req *streq;
- struct lov_desc *desc;
+ struct lov_desc *desc;
int tgt_count;
int rc, size[2] = {sizeof(*desc)};
ENTRY;
__u64 res_id[3] = {0, 0, 0};
ENTRY;
- if (strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds") != 0)
- LBUG();
+ LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
if (req->rq_reqmsg->bufcount <= offset + 1) {
LBUG();
}
dir = de->d_inode;
- CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
+ CDEBUG(D_INODE, "parent ino %ld, name %*s\n", dir->i_ino,namelen,name);
lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
res_id[0] = dir->i_ino;
list_add(&mfd->mfd_list, &med->med_open_head);
body = lustre_msg_buf(req->rq_repmsg, 0);
+ /* FIXME: need to have cookies involved here */
body->extra = (__u64) (unsigned long)file;
RETURN(0);
}
RETURN(0);
}
+ /* FIXME: need to have cookies involved here */
file = (struct file *)(unsigned long)body->extra;
if (!file->f_dentry)
LBUG();
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
&req->rq_repmsg);
if (rc) {
- rc = req->rq_status = -ENOMEM;
+ req->rq_status = rc;
break;
}
rc = mds_reint(0, req);
* also the last_rcvd value to disk. If we don't have a clean shutdown,
* then the server last_rcvd value may be less than that of the clients.
* This will alert us that we may need to do client recovery.
+ *
+ * Assumes we are already in the server filesystem context.
*/
static
int mds_update_server_data(struct mds_obd *mds)
{
- struct obd_run_ctxt saved;
struct mds_server_data *msd = mds->mds_server_data;
struct file *filp = mds->mds_rcvd_filp;
loff_t off = 0;
CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
(unsigned long long)mds->mds_mount_count,
(unsigned long long)mds->mds_last_rcvd);
- push_ctxt(&saved, &mds->mds_ctxt);
rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
if (rc != sizeof(*msd)) {
CERROR("error writing MDS server data: rc = %d\n", rc);
RETURN(rc);
}
rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
- pop_ctxt(&saved);
if (rc)
CERROR("error flushing MDS server data: rc = %d\n", rc);
static int mds_recover(struct obd_device *obddev)
{
struct mds_obd *mds = &obddev->u.mds;
+ struct obd_run_ctxt saved;
int rc;
/* This happens at the end when recovery is complete */
++mds->mds_mount_count;
+ push_ctxt(&saved, &mds->mds_ctxt);
rc = mds_update_server_data(mds);
+ pop_ctxt(&saved);
return rc;
}
GOTO(err_fs, rc = -EINVAL);
}
- rc = -ENOENT;
obddev->obd_namespace =
ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
if (obddev->obd_namespace == NULL) {
- LBUG();
mds_cleanup(obddev);
- GOTO(err_svc, rc);
+ GOTO(err_svc, rc = -ENOMEM);
}
for (i = 0; i < MDS_NUM_THREADS; i++) {
rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
if (rc) {
CERROR("cannot start MDS thread #%d: rc %d\n", i, rc);
- LBUG();
GOTO(err_thread, rc);
}
}
{
struct super_block *sb;
struct mds_obd *mds = &obddev->u.mds;
-
+ struct obd_run_ctxt saved;
ENTRY;
- if (!list_empty(&obddev->obd_exports)) {
- CERROR("still has exports; forcing cleanup\n");
- class_disconnect_all(obddev);
- if (!list_empty(&obddev->obd_exports)) {
- CERROR("still has exports after forced cleanup?\n");
- RETURN(-EBUSY);
- }
- }
-
ptlrpc_stop_all_threads(mds->mds_service);
ptlrpc_unregister_service(mds->mds_service);
if (!mds->mds_sb)
RETURN(0);
+ push_ctxt(&saved, &mds->mds_ctxt);
mds_update_server_data(mds);
if (mds->mds_rcvd_filp) {
if (rc)
CERROR("last_rcvd file won't close, rc=%d\n", rc);
}
+ pop_ctxt(&saved);
unlock_kernel();
mntput(mds->mds_vfsmnt);
* Otherwise, we have just read the data from the last_rcvd file and
* we know its offset.
*/
-int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
+int mds_client_add(struct mds_export_data *med, int cl_off)
{
- CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
- cl_off, med->med_mcd->mcd_uuid);
-
if (cl_off == -1) {
unsigned long *word;
int bit;
while(*word == ~0UL)
++word;
if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
- CERROR("no room in client MDS bitmap - fix code\n");
+ CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
return -ENOMEM;
}
bit = ffz(*word);
if (test_and_set_bit(bit, word)) {
- CERROR("found bit %d set for word %d - fix code\n",
+ CERROR("found bit %d set for word %d - fix locking\n",
bit, word - last_rcvd_slots);
+ LBUG();
goto repeat;
}
cl_off = word - last_rcvd_slots + bit;
}
}
- med->med_off = cl_off;
- mds->mds_client_count++;
+ CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
+ cl_off, med->med_mcd->mcd_uuid);
+ med->med_off = cl_off;
return 0;
}
-static int mds_client_free_all(struct obd_device *obddev)
+int mds_client_free(struct mds_export_data *med)
{
- struct mds_obd *mds = &obddev->u.mds;
- struct list_head *p, *n;
+ unsigned long *word;
+ int bit;
- list_for_each_safe(p, n, &obddev->obd_exports) {
- struct obd_export *exp;
- struct mds_export_data *med;
- unsigned long *word;
- int bit;
-
- exp = list_entry(p, struct obd_export, exp_chain);
- med = &exp->exp_mds_data;
-
- word = last_rcvd_slots + med->med_off / sizeof(unsigned long);
- bit = med->med_off % sizeof(unsigned long);
+ CDEBUG(D_INFO, "freeing client at offset %d with UUID '%s'\n",
+ med->med_off, med->med_mcd->mcd_uuid);
- if (!test_and_clear_bit(bit, word)) {
- CERROR("bit %d already clear in word %d - bad bad\n",
- bit, word - last_rcvd_slots);
- LBUG();
- }
+ word = last_rcvd_slots + med->med_off / sizeof(unsigned long);
+ bit = med->med_off % sizeof(unsigned long);
- OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
- --mds->mds_client_count;
+ if (!test_and_clear_bit(bit, word)) {
+ CERROR("bit %d already clear in word %d - bad bad\n",
+ bit, word - last_rcvd_slots);
+ LBUG();
}
- if (mds->mds_client_count) {
- CERROR("%d mds clients remaining after cleanup\n",
- mds->mds_client_count);
- /* LBUG()? */
- }
-
+ OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
+
return 0;
}
int cl_off;
__u64 last_rcvd = 0;
__u64 last_mount;
+ int clients = 0;
int rc = 0;
OBD_ALLOC(msd, sizeof(*msd));
for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
- if (!mcd)
+ if (!mcd) {
OBD_ALLOC(mcd, sizeof(*mcd));
- if (!mcd)
- GOTO(err_msd, rc = -ENOMEM);
+ if (!mcd)
+ GOTO(err_msd, rc = -ENOMEM);
+ }
rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
if (rc != sizeof(*mcd)) {
last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
+ /* Do client recovery here (open files, etc) */
if (last_rcvd && (last_mount - le64_to_cpu(mcd->mcd_mount_count)
< MDS_MOUNT_RECOV)) {
struct obd_export *export = class_new_export(obddev);
break;
}
export->exp_mds_data.med_mcd = mcd;
+ mds_client_add(&export->exp_mds_data, cl_off);
mcd = NULL;
+ clients++;
} else {
CDEBUG(D_INFO,
- "client at offset %d with UUID '%s' ignored\n",
- cl_off, mcd->mcd_uuid);
+ "ignored client %d, UUID '%s', last_mount %Ld\n",
+ cl_off, mcd->mcd_uuid,
+ (long long)le64_to_cpu(mcd->mcd_mount_count));
}
if (last_rcvd > mds->mds_last_rcvd) {
mds->mds_last_rcvd = last_rcvd;
}
}
- CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d clients\n",
- (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
+ CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d/%d clients\n",
+ (unsigned long long)mds->mds_last_rcvd, clients, cl_off);
if (mcd)
OBD_FREE(mcd, sizeof(*mcd));
CERROR("cannot create ROOT directory: rc = %d\n", rc);
GOTO(err_pop, rc);
}
- /* XXX probably want to hold on to this later... */
- dput(dentry);
- f = filp_open("ROOT", O_RDONLY, 0);
- if (IS_ERR(f)) {
- rc = PTR_ERR(f);
- CERROR("cannot open ROOT: rc = %d\n", rc);
- LBUG();
- GOTO(err_pop, rc);
- }
- mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
- mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
+ mds->mds_rootfid.id = dentry->d_inode->i_ino;
+ mds->mds_rootfid.generation = dentry->d_inode->i_generation;
mds->mds_rootfid.f_type = S_IFDIR;
- rc = filp_close(f, 0);
- if (rc) {
- CERROR("cannot close ROOT: rc = %d\n", rc);
- LBUG();
- }
+ dput(dentry);
dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
if (IS_ERR(dentry)) {
GOTO(err_client, rc);
}
mds->mds_rcvd_filp = f;
+err_pop:
pop_ctxt(&saved);
- RETURN(0);
+ return rc;
err_client:
- mds_client_free_all(obddev);
+ class_disconnect_all(obddev);
err_filp:
if (filp_close(f, 0))
CERROR("can't close %s after error\n", LAST_RCVD);
-err_pop:
- pop_ctxt(&saved);
-
- return rc;
+ goto err_pop;
}
static struct mds_fs_operations *mds_search_fs_type(const char *name)
void mds_fs_cleanup(struct obd_device *obddev)
{
struct mds_obd *mds = &obddev->u.mds;
- mds_client_free_all(obddev);
+
+ class_disconnect_all(obddev); /* this cleans up client info too */
mds_server_free_data(mds);
OBD_FREE(mds->mds_sop, sizeof(*mds->mds_sop));