return result;
}
+#define MDS_MAX_CLIENTS 1024
+#define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
+
+static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
+
+/* Add client data to the MDS. The in-memory storage will be a hash at some
+ * point. We use a bitmap to locate a free space in the last_rcvd file if
+ * cl_off is -1 (i.e. a new client). Otherwise, we have just read the data
+ * from the last_rcvd file and we know its offset.
+ */
+int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
+{
+ struct mds_client_info *mci;
+
+ OBD_ALLOC(mci, sizeof(*mci));
+ if (!mci) {
+ CERROR("no memory for MDS client info\n");
+ RETURN(-ENOMEM);
+ }
+
+ CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
+ cl_off, mcd->mcd_uuid);
+
+ if (cl_off == -1) {
+ unsigned long *word;
+ int bit;
+
+ repeat:
+ word = last_rcvd_slots;
+ while(*word == ~0UL)
+ ++word;
+ if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
+ CERROR("no room in client MDS bitmap - fix code\n");
+ return -ENOMEM;
+ }
+ bit = ffz(*word);
+ if (test_and_set_bit(bit, word)) {
+ CERROR("found bit %d set for word %d - fix code\n",
+ bit, word - last_rcvd_slots);
+ goto repeat;
+ }
+ cl_off = word - last_rcvd_slots + bit;
+ } else {
+ unsigned long *word;
+ int bit;
+
+ word = last_rcvd_slots + cl_off / sizeof(unsigned long);
+ bit = cl_off % sizeof(unsigned long);
+
+ if (test_and_set_bit(bit, word)) {
+ CERROR("bit %d already set in word %d - bad bad\n",
+ bit, word - last_rcvd_slots);
+ LBUG();
+ }
+ }
+
+ mci->mci_mcd = mcd;
+ mci->mci_off = cl_off;
+
+ /* For now we just put the clients in a list, not a hashed list */
+ list_add_tail(&mci->mci_list, &mds->mds_client_info);
+
+ mds->mds_client_count++;
+
+ return 0;
+}
+
+void mds_client_del(struct mds_obd *mds, struct mds_client_info *mci)
+{
+ unsigned long *word;
+ int bit;
+
+ word = last_rcvd_slots + mci->mci_off / sizeof(unsigned long);
+ bit = mci->mci_off % sizeof(unsigned long);
+
+ if (!test_and_clear_bit(bit, word)) {
+ CERROR("bit %d already clear in word %d - bad bad\n",
+ bit, word - last_rcvd_slots);
+ LBUG();
+ }
+
+ --mds->mds_client_count;
+ list_del(&mci->mci_list);
+ OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
+ OBD_FREE(mci, sizeof (*mci));
+}
+
+int mds_client_free_all(struct mds_obd *mds)
+{
+ struct list_head *p, *n;
+
+ list_for_each_safe(p, n, &mds->mds_client_info) {
+ struct mds_client_info *mci;
+
+ mci = list_entry(p, struct mds_client_info, mci_list);
+ mds_client_del(mds, mci);
+ }
+
+ return 0;
+}
+
+int mds_server_free_data(struct mds_obd *mds)
+{
+ OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
+ mds->mds_server_data = NULL;
+
+ return 0;
+}
+
int mds_connect(struct ptlrpc_request *req)
{
struct mds_body *body;
struct mds_obd *mds = &req->rq_obd->u.mds;
+ struct mds_client_info *mci;
+ struct mds_client_data *mcd;
int rc, size = sizeof(*body);
ENTRY;
+ CDEBUG(D_INFO, "MDS connect from UUID '%s'\n", ptlrpc_req_to_uuid(req));
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
- CERROR("mds: out of memory\n");
+ CERROR("mds: out of memory for message: size=%d\n", size);
req->rq_status = -ENOMEM;
RETURN(0);
}
body = lustre_msg_buf(req->rq_reqmsg, 0);
- mds_unpack_req_body(req);
+ mds_unpack_req_body(req);
/* Anything we need to do here with the client's trans no or so? */
body = lustre_msg_buf(req->rq_repmsg, 0);
- memcpy(&body->fid1, &mds->mds_rootfid , sizeof(body->fid1));
+ memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
+
+ mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
+ if (!mci) {
+ /* We don't have any old connection data for this client */
+ int rc;
+
+ CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
+ ptlrpc_req_to_uuid(req));
+
+ OBD_ALLOC(mcd, sizeof(*mcd));
+ if (!mcd) {
+ CERROR("mds: out of memory for client data\n");
+ req->rq_status = -ENOMEM;
+ RETURN(0);
+ }
+ rc = mds_client_add(mds, mcd, -1);
+ if (rc) {
+ req->rq_status = rc;
+ RETURN(0);
+ }
+ } else {
+ /* We have old connection data for this client... */
+ mcd = mci->mci_mcd;
+ CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
+ mcd->mcd_uuid, mci->mci_off);
+ }
+ /* Still not 100% sure whether we should reply with the server
+ * last_rcvd or that of this client. I'm not sure it even makes
+ * a difference on a per-client basis, because last_rcvd is global
+ * and we are not supposed to allow transactions while in recovery.
+ */
+ body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
+ body->last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
+ //body->last_rcvd = mds->mds_last_rcvd;
+ body->last_committed = mds->mds_last_committed;
+ CDEBUG(D_INFO, "last_rcvd %ld, last_committed %ld, last_xid %d\n",
+ (unsigned long)body->last_rcvd,
+ (unsigned long)body->last_committed, body->last_xid);
mds_pack_rep_body(req);
RETURN(0);
}
body->mode = inode->i_mode;
body->nlink = inode->i_nlink;
body->valid = ~0;
+ body->last_committed = mds->mds_last_committed;
mds_fs_get_objid(mds, inode, &body->objid);
l_dput(de);
RETURN(0);
int mds_open(struct ptlrpc_request *req)
{
+ struct mds_obd *mds = &req->rq_obd->u.mds;
struct dentry *de;
struct mds_body *body;
struct file *file;
}
body = lustre_msg_buf(req->rq_reqmsg, 0);
- de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
+ de = mds_fid2dentry(mds, &body->fid1, &mnt);
if (IS_ERR(de)) {
req->rq_status = -ENOENT;
RETURN(0);
body = lustre_msg_buf(req->rq_repmsg, 0);
body->objid = (__u64) (unsigned long)file;
+ body->last_committed = mds->mds_last_committed;
RETURN(0);
}
/* This will be a hash table at some point. */
int mds_init_client_data(struct mds_obd *mds)
{
- if (mds->mds_client_info)
- LBUG();
-
- OBD_ALLOC(mds->mds_client_info,
- MDS_CLIENT_SLOTS * sizeof(struct mds_client_info));
-
- if (!mds->mds_client_info)
- return -ENOMEM;
-
- return 0;
-}
-
-/* Add client data to the MDS. This will be a hash at some point. */
-int mds_add_client(struct mds_obd *mds, struct mds_client_data *mcd, loff_t off)
-{
- int num = mds->mds_client_count;
-
- if (num >= MDS_CLIENT_SLOTS) {
- CERROR("too many clients for current MDS config - fix code\n");
- return -ENOMEM;
- }
-
- /* For now we cop-out and just put the clients in a list */
- mds->mds_client_info[num].mci_mcd = mcd;
- mds->mds_client_info[num].mci_off = off; /* in last_rcvd on disk */
-
- mds->mds_client_count++;
-
- return 0;
-}
-
-int mds_free_client_data(struct mds_obd *mds)
-{
- struct mds_client_info *mci = mds->mds_client_info;
- int i;
-
- if (!mci)
- RETURN(0);
-
- for (i = 0; i < mds->mds_client_count; i++, mci++) {
- OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
- mci->mci_mcd = NULL;
- }
-
- OBD_FREE(mds->mds_client_info, MDS_CLIENT_SLOTS * sizeof(*mci));
- mds->mds_client_info = NULL;
-
- return 0;
-}
-
-int mds_free_server_data(struct mds_obd *mds)
-{
- OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
- mds->mds_server_data = NULL;
-
+ INIT_LIST_HEAD(&mds->mds_client_info);
return 0;
}
struct mds_server_data *msd;
struct mds_client_data *mcd = NULL;
loff_t fsize = f->f_dentry->d_inode->i_size;
- loff_t off = 0, cl_off;
+ loff_t off = 0;
+ int cl_off;
__u64 last_rcvd = 0;
__u64 last_mount;
int rc = 0;
GOTO(err_msd, rc);
}
+ /*
+ * When we do a clean MDS shutdown, we save the last_rcvd into
+ * the header. If we find clients with higher last_rcvd values
+ * then those clients may need recovery done.
+ */
last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
mds->mds_last_rcvd = last_rcvd;
CDEBUG(D_INODE, "got %Ld for server last_rcvd value\n",
last_mount = le64_to_cpu(msd->msd_mount_count);
mds->mds_mount_count = last_mount;
CDEBUG(D_INODE, "got %Ld for server last_mount value\n",
- (unsigned long long)last_rcvd);
+ (unsigned long long)last_mount);
- for (off = cl_off = MDS_LR_CLIENT, rc = sizeof(*mcd);
+ for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
- off = cl_off + MDS_LR_SIZE) {
+ off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
if (!mcd)
OBD_ALLOC(mcd, sizeof(*mcd));
if (!mcd)
rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
if (rc != sizeof(*mcd)) {
- CERROR("error reading MDS %s offset %Ld: rc = %d\n",
- LAST_RCVD, (unsigned long long)cl_off, rc);
+ CERROR("error reading MDS %s offset %d: rc = %d\n",
+ LAST_RCVD, cl_off, rc);
if (rc > 0)
rc = -EIO;
break;
if (last_rcvd &&
last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
- rc = mds_add_client(mds, mcd, cl_off);
+ rc = mds_client_add(mds, mcd, cl_off);
if (rc) {
rc = 0;
break;
}
mcd = NULL;
+ } else {
+ CDEBUG(D_INFO,
+ "client at offset %d with UUID '%s' ignored\n",
+ cl_off, mcd->mcd_uuid);
}
if (last_rcvd > mds->mds_last_rcvd) {
CDEBUG(D_OTHER,
- "client at offset %Ld has last_rcvd = %Ld\n",
- (unsigned long long)cl_off,
- (unsigned long long)last_rcvd);
+ "client at offset %d has last_rcvd = %Ld\n",
+ cl_off, (unsigned long long)last_rcvd);
mds->mds_last_rcvd = last_rcvd;
}
}
CDEBUG(D_INODE, "got %Ld for highest last_rcvd value, %d clients\n",
(unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
+ /* After recovery, there can be no local uncommitted transactions */
+ mds->mds_last_committed = mds->mds_last_rcvd;
+
return 0;
err_msd:
- mds_free_server_data(mds);
+ mds_server_free_data(mds);
return rc;
}
GOTO(err_pop, rc = -ENOENT);
}
- rc = mds_fs_journal_data(mds, f->f_dentry->d_inode, f);
+ rc = mds_fs_journal_data(mds, f);
if (rc) {
CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
GOTO(err_filp, rc);
rpc_unregister_service(mds->mds_service);
OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
err_client:
- mds_free_client_data(mds);
+ mds_client_free_all(mds);
err_filp:
if (filp_close(f, 0))
CERROR("can't close %s after error\n", LAST_RCVD);
return rc;
}
-/* Update the server data on disk. */
+/* Update the server data on disk. This stores the new mount_count and
+ * also the last_rcvd value to disk. If we don't have a clean shutdown,
+ * then the server last_rcvd value may be less than that of the clients.
+ * This will alert us that we may need to do client recovery.
+ */
int mds_update_server_data(struct mds_obd *mds)
{
struct obd_run_ctxt saved;
struct mds_server_data *msd = mds->mds_server_data;
+ struct file *filp = mds->mds_rcvd_filp;
loff_t off = 0;
int rc;
msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
- CDEBUG(D_SUPER, "MDS mount_count is %Ld\n",
- (unsigned long long)mds->mds_mount_count);
+ CDEBUG(D_SUPER, "MDS mount_count is %Ld, last_rcvd is %Ld\n",
+ (unsigned long long)mds->mds_mount_count,
+ (unsigned long long)mds->mds_last_rcvd);
push_ctxt(&saved, &mds->mds_ctxt);
- rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)msd, sizeof(*msd), &off);
- pop_ctxt(&saved);
+ rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
if (rc != sizeof(*msd)) {
CERROR("error writing MDS server data: rc = %d\n", rc);
if (rc > 0)
RETURN(-EIO);
RETURN(rc);
}
+ rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
+ pop_ctxt(&saved);
+ if (rc)
+ CERROR("error flushing MDS server data: rc = %d\n", rc);
return 0;
}
if (!mds->mds_sb)
RETURN(0);
- mds_free_client_data(mds);
+ mds_client_free_all(mds);
mds_update_server_data(mds);
- mds_free_server_data(mds);
+ mds_server_free_data(mds);
if (mds->mds_rcvd_filp) {
int rc = filp_close(mds->mds_rcvd_filp, 0);
#include <linux/lustre_mds.h>
#include <linux/obd_class.h>
+struct mds_client_info *mds_uuid_to_mci(struct mds_obd *mds, __u8 *uuid)
+{
+ struct list_head *p;
+
+ if (!uuid)
+ return NULL;
+
+ list_for_each(p, &mds->mds_client_info) {
+ struct mds_client_info *mci;
+
+ mci = list_entry(p, struct mds_client_info, mci_list);
+ CDEBUG(D_INFO, "checking client UUID '%s'\n",
+ mci->mci_mcd->mcd_uuid);
+ if (!strncmp(mci->mci_mcd->mcd_uuid, uuid,
+ sizeof(mci->mci_mcd->mcd_uuid)))
+ return mci;
+ }
+ CDEBUG(D_INFO, "no client UUID found for '%s'\n", uuid);
+ return NULL;
+}
+
int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
struct ptlrpc_request *req)
{
/* get from req->rq_connection-> or req->rq_client */
- struct mds_client_info mci_data, *mci = &mci_data;
- struct mds_client_data mcd_data, *mcd = &mcd_data;
+ struct mds_client_info *mci;
loff_t off;
int rc;
- /* Just a placeholder until more gets committed */
- memset(mcd, 0, sizeof(*mcd));
- mci->mci_mcd = mcd;
- mci->mci_off = off = MDS_LR_CLIENT;
+ mci = mds_uuid_to_mci(mds, req->rq_connection->c_remote_uuid);
+ if (!mci) {
+ CERROR("unable to locate MDS client data for UUID '%s'\n",
+ ptlrpc_req_to_uuid(req));
+ /* This will be a real error once everything is working */
+ //LBUG();
+ RETURN(0);
+ }
+
+ off = MDS_LR_CLIENT + mci->mci_off * MDS_LR_SIZE;
++mds->mds_last_rcvd; /* lock this, or make it an LDLM function? */
mci->mci_mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
mci->mci_mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
- mci->mci_mcd->mcd_xid = cpu_to_le32(req->rq_connection->c_xid_in);
+ mci->mci_mcd->mcd_last_xid = cpu_to_le32(req->rq_reqmsg->xid);
mds_fs_set_last_rcvd(mds, handle);
rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mci->mci_mcd,
sizeof(*mci->mci_mcd), &off);
- CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at %Ld: rc = %d\n",
+ CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at #%d: rc = %d\n",
mds->mds_last_rcvd, mci->mci_mcd->mcd_uuid, mci->mci_off, rc);
// store new value and last committed value in req struct
if (!rc)
rc = mds_update_last_rcvd(mds, handle, req);
+ /* FIXME: need to return last_rcvd, last_committed */
EXIT;
GOTO(out_create_de, rc = -ESTALE);
}
dir = de->d_inode;
- CDEBUG(D_INODE, "ino %ld\n", dir->i_ino);
+ CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
down(&dir->i_sem);
dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
}
if (dchild->d_inode) {
- CERROR("child exists (dir %ld, name %s)\n",
- dir->i_ino, rec->ur_name);
+ CERROR("child exists (dir %ld, name %s, ino %ld)\n",
+ dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
LBUG();
GOTO(out_create_dchild, rc = -EEXIST);
}
struct inode *inode = dchild->d_inode;
struct mds_body *body;
+ CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
if (type == S_IFREG) {
rc = mds_fs_set_objid(mds, inode, handle, rec->ur_id);
if (rc)
rc = mds_fs_setattr(mds, dchild, handle, &iattr);
/* XXX should we abort here in case of error? */
+ //if (!rc)
+ rc = mds_update_last_rcvd(mds, handle, req);
+
body = lustre_msg_buf(req->rq_repmsg, 0);
body->ino = inode->i_ino;
body->generation = inode->i_generation;
+ body->last_rcvd = mds->mds_last_rcvd;
+ body->last_committed = mds->mds_last_committed;
}
- if (!rc)
- rc = mds_update_last_rcvd(mds, handle, req);
-
out_create_commit:
/* FIXME: keep rc intact */
rc = mds_fs_commit(mds, dir, handle);
GOTO(out_unlink, rc = -ESTALE);
}
dir = de->d_inode;
- CDEBUG(D_INODE, "ino %ld\n", dir->i_ino);
+ CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
down(&dir->i_sem);
dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
if (!rc)
rc = mds_update_last_rcvd(mds, handle, req);
+ /* FIXME: need to return last_rcvd, last_committed */
/* FIXME: keep rc intact */
rc = mds_fs_commit(mds, dir, handle);
if (!rc)
rc = mds_update_last_rcvd(mds, handle, req);
+ /* FIXME: need to return last_rcvd, last_committed */
/* FIXME: keep rc intact */
rc = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
EXIT;
if (!rc)
rc = mds_update_last_rcvd(mds, handle, req);
+ /* FIXME: need to return last_rcvd, last_committed */
/* FIXME: keep rc intact */
rc = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
EXIT;