Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 27f3717..d69f624 100644 (file)
@@ -19,7 +19,9 @@
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
-#define EXPORT_SYMTAB
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
 #define DEBUG_SUBSYSTEM S_MDC
 
 #ifdef __KERNEL__
 # include <linux/init.h>
 #else
 # include <liblustre.h>
-# include <linux/obd_class.h>
 #endif
 
+#include <linux/obd_class.h>
 #include <linux/lustre_mds.h>
-#include <linux/lustre_lite.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lprocfs_status.h>
 #include "mdc_internal.h"
 #define REQUEST_MINOR 244
 
 extern int mds_queue_req(struct ptlrpc_request *);
-struct mdc_rpc_lock mdc_rpc_lock;
-struct mdc_rpc_lock mdc_setattr_lock;
-EXPORT_SYMBOL(mdc_rpc_lock);
-
 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
 static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
@@ -60,7 +57,7 @@ static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
                 GOTO(out, rc = -ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-        req->rq_level = level;
+        req->rq_send_state = level;
         req->rq_replen = lustre_msg_size(1, &size);
 
         mdc_pack_req_body(req);
@@ -90,84 +87,14 @@ static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
 }
 
 /* This should be mdc_get_info("rootfid") */
-int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
+int mdc_getstatus(struct obd_export *exp, struct ll_fid *rootfid)
 {
-        return send_getstatus(class_conn2cliimp(conn), rootfid, LUSTRE_CONN_CON,
+        return send_getstatus(class_exp2cliimp(exp), rootfid, LUSTRE_IMP_FULL,
                               0);
 }
 
-/* should call mdc_get_info("lovdesc") and mdc_get_info("lovtgts") */
-int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
-                   struct ptlrpc_request **request)
-{
-        struct ptlrpc_request *req;
-        struct mds_status_req *streq;
-        struct lov_desc       *desc;
-        struct obd_uuid       *uuids;
-        int rc, size[2] = {sizeof(*streq)};
-        int i;
-        ENTRY;
-
-        req = ptlrpc_prep_req(class_conn2cliimp(mdc_connh), MDS_GETLOVINFO, 1,
-                              size, NULL);
-        if (!req)
-                RETURN (-ENOMEM);
-
-        *request = req;
-        streq = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*streq));
-        streq->flags = MDS_STATUS_LOV;
-        streq->repbuf = LOV_MAX_UUID_BUFFER_SIZE;
-
-        /* prepare for reply */
-        req->rq_level = LUSTRE_CONN_CON;
-        size[0] = sizeof (*desc);
-        size[1] = LOV_MAX_UUID_BUFFER_SIZE;
-        req->rq_replen = lustre_msg_size(2, size);
-
-        mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
-        rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
-
-        if (rc != 0) {
-                CERROR ("rcp failed\n");
-                GOTO (failed, rc);
-        }
-
-        desc = lustre_swab_repbuf (req, 0, sizeof (*desc),
-                                   lustre_swab_lov_desc);
-        if (desc == NULL) {
-                CERROR ("Can't unpack lov_desc\n");
-                GOTO (failed, rc = -EPROTO);
-        }
-
-        LASSERT_REPSWAB (req, 1);
-        /* array of uuids byte-sex insensitive; just verify they are all
-         * there and terminated */
-        uuids = lustre_msg_buf (req->rq_repmsg, 1,
-                                desc->ld_tgt_count * sizeof (*uuids));
-        if (uuids == NULL) {
-                CERROR ("Can't unpack %d uuids\n", desc->ld_tgt_count);
-                GOTO (failed, rc = -EPROTO);
-        }
-
-        for (i = 0; i < desc->ld_tgt_count; i++) {
-                int uid_len = strnlen (uuids[i].uuid, sizeof (uuids[i].uuid));
-
-                if (uid_len == sizeof (uuids[i].uuid)) {
-                        CERROR ("Unterminated uuid %d:%*s\n",
-                                i, (int)sizeof (uuids[i].uuid), uuids[i].uuid);
-                        GOTO (failed, rc = -EPROTO);
-                }
-        }
-        RETURN(0);
-
- failed:
-        ptlrpc_req_finished (req);
-        RETURN (rc);
-}
-
-int mdc_getattr_common (struct lustre_handle *conn,
-                        unsigned int ea_size, struct ptlrpc_request *req)
+int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, 
+                       struct ptlrpc_request *req)
 {
         struct mds_body *body;
         void            *eadata;
@@ -185,9 +112,9 @@ int mdc_getattr_common (struct lustre_handle *conn,
         }
         req->rq_replen = lustre_msg_size(bufcount, size);
 
-        mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         if (rc != 0)
                 RETURN (rc);
 
@@ -213,7 +140,7 @@ int mdc_getattr_common (struct lustre_handle *conn,
         RETURN (0);
 }
 
-int mdc_getattr(struct lustre_handle *conn, struct ll_fid *fid,
+int mdc_getattr(struct obd_export *exp, struct ll_fid *fid,
                 unsigned long valid, unsigned int ea_size,
                 struct ptlrpc_request **request)
 {
@@ -226,7 +153,7 @@ int mdc_getattr(struct lustre_handle *conn, struct ll_fid *fid,
         /* XXX do we need to make another request here?  We just did a getattr
          *     to do the lookup in the first place.
          */
-        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR, 1, &size,
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR, 1, &size,
                               NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
@@ -237,7 +164,7 @@ int mdc_getattr(struct lustre_handle *conn, struct ll_fid *fid,
         body->eadatasize = ea_size;
         mdc_pack_req_body(req);
 
-        rc = mdc_getattr_common (conn, ea_size, req);
+        rc = mdc_getattr_common(exp, ea_size, req);
         if (rc != 0) {
                 ptlrpc_req_finished (req);
                 req = NULL;
@@ -247,7 +174,7 @@ int mdc_getattr(struct lustre_handle *conn, struct ll_fid *fid,
         RETURN (rc);
 }
 
-int mdc_getattr_name(struct lustre_handle *conn, struct ll_fid *fid,
+int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
                      char *filename, int namelen, unsigned long valid,
                      unsigned int ea_size, struct ptlrpc_request **request)
 {
@@ -256,7 +183,7 @@ int mdc_getattr_name(struct lustre_handle *conn, struct ll_fid *fid,
         int rc, size[2] = {sizeof(*body), namelen};
         ENTRY;
 
-        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR_NAME, 2,
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR_NAME, 2,
                               size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
@@ -270,14 +197,14 @@ int mdc_getattr_name(struct lustre_handle *conn, struct ll_fid *fid,
         LASSERT (strnlen (filename, namelen) == namelen - 1);
         memcpy(lustre_msg_buf(req->rq_reqmsg, 1, namelen), filename, namelen);
 
-        rc = mdc_getattr_common (conn, ea_size, req);
+        rc = mdc_getattr_common(exp, ea_size, req);
         if (rc != 0) {
                 ptlrpc_req_finished (req);
                 req = NULL;
         }
  out:
         *request = req;
-        return rc;
+        RETURN(rc);
 }
 
 /* This should be called with both the request and the reply still packed. */
@@ -293,12 +220,12 @@ void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
         LASSERT (body != NULL);
 
         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
-        DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
+        DEBUG_REQ(D_HA, req, "storing generation %u for ino "LPU64,
                   rec->cr_replayfid.generation, rec->cr_replayfid.id);
 }
 
 int mdc_req2lustre_md(struct ptlrpc_request *req, int offset,
-                      struct lustre_handle *obd_import,
+                      struct obd_export *exp,
                       struct lustre_md *md)
 {
         int rc = 0;
@@ -326,7 +253,7 @@ int mdc_req2lustre_md(struct ptlrpc_request *req, int offset,
                 LASSERT (lmm != NULL);
                 LASSERT_REPSWABBED (req, offset + 1);
 
-                rc = obd_unpackmd(obd_import, &md->lsm, lmm, lmmsize);
+                rc = obd_unpackmd(exp, &md->lsm, lmm, lmmsize);
                 if (rc >= 0) {
                         LASSERT (rc >= sizeof (*md->lsm));
                         rc = 0;
@@ -335,263 +262,192 @@ int mdc_req2lustre_md(struct ptlrpc_request *req, int offset,
         RETURN(rc);
 }
 
-
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type. */
-int mdc_enqueue(struct lustre_handle *conn,
-                int lock_type,
-                struct lookup_intent *it,
-                int lock_mode,
-                struct mdc_op_data *data,
-                struct lustre_handle *lockh,
-                char *tgt,
-                int tgtlen,
-                ldlm_completion_callback cb_completion,
-                ldlm_blocking_callback cb_blocking,
-                void *cb_data)
+static void mdc_commit_open(struct ptlrpc_request *req)
 {
-        struct ptlrpc_request *req;
-        struct obd_device *obddev = class_conn2obd(conn);
-        struct ldlm_res_id res_id =
-                { .name = {data->ino1, data->gen1} };
-        int size[6] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
-        int rc, flags = LDLM_FL_HAS_INTENT;
-        int repsize[4] = {sizeof(struct ldlm_reply),
-                          sizeof(struct mds_body),
-                          obddev->u.cli.cl_max_mds_easize,
-                          obddev->u.cli.cl_max_mds_cookiesize};
-        struct ldlm_reply *dlm_rep;
-        struct ldlm_intent *lit;
-        struct ldlm_request *lockreq;
-        void *eadata;
-        unsigned long irqflags;
-        int   reply_buffers = 0;
-        ENTRY;
-
-//        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
-//                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
-
-        if (it->it_op & IT_OPEN) {
-                it->it_mode |= S_IFREG;
-                it->it_mode &= ~current->fs->umask;
-
-                size[2] = sizeof(struct mds_rec_create);
-                size[3] = data->namelen + 1;
-                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
-                                      size, NULL);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                spin_lock_irqsave (&req->rq_lock, irqflags);
-                req->rq_replay = 1;
-                spin_unlock_irqrestore (&req->rq_lock, irqflags);
-
-                /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
-                lit->opc = (__u64)it->it_op;
-
-                /* pack the intended request */
-                mdc_open_pack(req, 2, data, it->it_mode, 0, current->fsuid,
-                              current->fsgid, LTIME_S(CURRENT_TIME),
-                              it->it_flags, tgt, tgtlen);
-                /* get ready for the reply */
-                reply_buffers = 3;
-                req->rq_replen = lustre_msg_size(3, repsize);
-        } else if (it->it_op & IT_UNLINK) {
-                size[2] = sizeof(struct mds_rec_unlink);
-                size[3] = data->namelen + 1;
-                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
-                                      size, NULL);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
-                lit->opc = (__u64)it->it_op;
-
-                /* pack the intended request */
-                mdc_unlink_pack(req, 2, data);
-                /* get ready for the reply */
-                reply_buffers = 4;
-                req->rq_replen = lustre_msg_size(4, repsize);
-        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
-                int valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
-                size[2] = sizeof(struct mds_body);
-                size[3] = data->namelen + 1;
-
-                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
-                                      size, NULL);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
-                lit->opc = (__u64)it->it_op;
-
-                /* pack the intended request */
-                mdc_getattr_pack(req, valid, 2, it->it_flags, data);
-                /* get ready for the reply */
-                reply_buffers = 3;
-                req->rq_replen = lustre_msg_size(3, repsize);
-        } else if (it->it_op == IT_READDIR) {
-                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 1,
-                                      size, NULL);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                /* get ready for the reply */
-                reply_buffers = 1;
-                req->rq_replen = lustre_msg_size(1, repsize);
-        }  else {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-
-        mdc_get_rpc_lock(&mdc_rpc_lock, it);
-        rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id,
-                              lock_type, NULL, 0, lock_mode, &flags,
-                              cb_completion, cb_blocking, cb_data, lockh);
-        mdc_put_rpc_lock(&mdc_rpc_lock, it);
-
-        /* Similarly, if we're going to replay this request, we don't want to
-         * actually get a lock, just perform the intent. */
-        if (req->rq_transno || req->rq_replay) {
-                lockreq = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*lockreq));
-                lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
-        }
+        struct mdc_open_data *mod = req->rq_cb_data;
+        if (mod == NULL)
+                return;
 
-        /* This can go when we're sure that this can never happen */
-        LASSERT(rc != -ENOENT);
-        if (rc == ELDLM_LOCK_ABORTED) {
-                lock_mode = 0;
-                memset(lockh, 0, sizeof(*lockh));
-        } else if (rc != 0) {
-                CERROR("ldlm_cli_enqueue: %d\n", rc);
-                LASSERT (rc < 0);
-                ptlrpc_req_finished(req);
-                RETURN(rc);
-        } else { /* rc = 0 */
-                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
-                LASSERT(lock);
-
-                /* If the server gave us back a different lock mode, we should
-                 * fix up our variables. */
-                if (lock->l_req_mode != lock_mode) {
-                        ldlm_lock_addref(lockh, lock->l_req_mode);
-                        ldlm_lock_decref(lockh, lock_mode);
-                        lock_mode = lock->l_req_mode;
-                }
-
-                LDLM_LOCK_PUT(lock);
-        }
-
-        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
-        LASSERT(dlm_rep != NULL);           /* checked by ldlm_cli_enqueue() */
-        LASSERT_REPSWABBED(req, 0);         /* swabbed by ldlm_cli_enqueue() */
-
-        it->it_disposition = (int) dlm_rep->lock_policy_res1;
-        it->it_status = (int) dlm_rep->lock_policy_res2;
-        it->it_lock_mode = lock_mode;
-        it->it_data = req;
-
-        /* We know what to expect, so we do any byte flipping required here */
-        LASSERT(reply_buffers == 4 || reply_buffers == 3 || reply_buffers == 1);
-        if (reply_buffers >= 3) {
-                struct mds_body *body;
-
-                body = lustre_swab_repbuf (req, 1, sizeof (*body),
-                                           lustre_swab_mds_body);
-                if (body == NULL) {
-                        CERROR ("Can't swab mds_body\n");
-                        RETURN (-EPROTO);
-                }
+        if (mod->mod_close_req != NULL)
+                mod->mod_close_req->rq_cb_data = NULL;
 
-                if ((body->valid & OBD_MD_FLEASIZE) != 0) {
-                        /* The eadata is opaque; just check that it is
-                         * there.  Eventually, obd_unpackmd() will check
-                         * the contents */
-                        eadata = lustre_swab_repbuf(req, 2, body->eadatasize,
-                                                    NULL);
-                        if (eadata == NULL) {
-                                CERROR ("Missing/short eadata\n");
-                                RETURN (-EPROTO);
-                        }
-                }
-        }
+        if (mod->mod_och != NULL)
+                mod->mod_och->och_mod = NULL;
 
-        RETURN(rc);
+        OBD_FREE(mod, sizeof(*mod));
+        req->rq_cb_data = NULL;
 }
 
 static void mdc_replay_open(struct ptlrpc_request *req)
 {
-        struct obd_client_handle *och = req->rq_replay_data;
-        struct lustre_handle old, *file_fh = &och->och_fh;
-        struct list_head *tmp;
+        struct mdc_open_data *mod = req->rq_cb_data;
+        struct obd_client_handle *och;
+        struct ptlrpc_request *close_req;
+        struct lustre_handle old; 
         struct mds_body *body;
+        ENTRY;
 
         body = lustre_swab_repbuf(req, 1, sizeof(*body), lustre_swab_mds_body);
         LASSERT (body != NULL);
 
-        memcpy(&old, file_fh, sizeof(old));
-        CDEBUG(D_HA, "updating handle from "LPD64" to "LPD64"\n",
-               file_fh->cookie, body->handle.cookie);
-        memcpy(file_fh, &body->handle, sizeof(body->handle));
-
-        /* A few frames up, ptlrpc_replay holds the lock, so this is safe. */
-        list_for_each(tmp, &req->rq_import->imp_sending_list) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                if (req->rq_reqmsg->opc != MDS_CLOSE)
-                        continue;
-                body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-                if (memcmp(&body->handle, &old, sizeof(old)))
-                        continue;
+        if (mod == NULL) {
+                DEBUG_REQ(D_ERROR, req,
+                          "can't properly replay without open data");
+                EXIT;
+                return;
+        }
 
-                DEBUG_REQ(D_HA, req, "updating close body with new fh");
-                memcpy(&body->handle, file_fh, sizeof(*file_fh));
+        och = mod->mod_och;
+        if (och != NULL) {
+                struct lustre_handle *file_fh; 
+                LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
+                file_fh = &och->och_fh;
+                CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
+                       file_fh->cookie, body->handle.cookie);
+                memcpy(&old, file_fh, sizeof(old));
+                memcpy(file_fh, &body->handle, sizeof(*file_fh));
+        } 
+
+        close_req = mod->mod_close_req;
+        if (close_req != NULL) {
+                struct mds_body *close_body;
+                LASSERT(close_req->rq_reqmsg->opc == MDS_CLOSE);
+                close_body = lustre_msg_buf(close_req->rq_reqmsg, 0,
+                                            sizeof(*close_body));
+                if (och != NULL)
+                        LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
+                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
+                memcpy(&close_body->handle, &body->handle,
+                       sizeof(close_body->handle));
         }
+
+        EXIT;
 }
 
-void mdc_set_open_replay_data(struct obd_client_handle *och)
+void mdc_set_open_replay_data(struct obd_client_handle *och,
+                              struct ptlrpc_request *open_req)
 {
-        struct ptlrpc_request *req = och->och_req;
+        struct mdc_open_data *mod;
         struct mds_rec_create *rec =
-                lustre_msg_buf(req->rq_reqmsg, 2, sizeof(*rec));
+                lustre_msg_buf(open_req->rq_reqmsg, 2, sizeof(*rec));
         struct mds_body *body =
-                lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
+                lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body));
 
         LASSERT(rec != NULL);
         /* outgoing messages always in my byte order */
         LASSERT(body != NULL);
         /* incoming message in my byte order (it's been swabbed) */
-        LASSERT_REPSWABBED(req, 1);
+        LASSERT_REPSWABBED(open_req, 1);
+
+        OBD_ALLOC(mod, sizeof(*mod));
+        if (mod == NULL) {
+                DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data");
+                return;
+        }
+
+        och->och_mod = mod;
+        mod->mod_och = och;
+        mod->mod_open_req = open_req;
 
         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
-        req->rq_replay_cb = mdc_replay_open;
-        req->rq_replay_data = och;
+        open_req->rq_replay_cb = mdc_replay_open;
+        open_req->rq_commit_cb = mdc_commit_open;
+        open_req->rq_cb_data = mod;
+        DEBUG_REQ(D_HA, open_req, "set up replay data");
 }
 
-int mdc_close(struct lustre_handle *conn, obd_id ino, int type,
-              struct lustre_handle *fh, struct ptlrpc_request **request)
+void mdc_clear_open_replay_data(struct obd_client_handle *och)
+{
+        struct mdc_open_data *mod = och->och_mod;
+
+        /* Don't free the structure now (it happens in mdc_commit_open, after
+         * we're sure we won't need to fix up the close request in the future),
+         * but make sure that replay doesn't poke at the och, which is about to
+         * be freed. */
+        LASSERT(mod != (void *)0x5a5a5a5a);
+        if (mod != NULL)
+                mod->mod_och = NULL;
+        och->och_mod = NULL;
+}
+
+static void mdc_commit_close(struct ptlrpc_request *req)
+{
+        struct mdc_open_data *mod = req->rq_cb_data;
+        struct ptlrpc_request *open_req;
+        struct obd_import *imp = req->rq_import;
+
+        DEBUG_REQ(D_HA, req, "close req committed");
+        if (mod == NULL)
+                return;
+
+        mod->mod_close_req = NULL;
+        req->rq_cb_data = NULL;
+        req->rq_commit_cb = NULL;
+
+        open_req = mod->mod_open_req;
+        LASSERT(open_req != NULL);
+        LASSERT(open_req != (void *)0x5a5a5a5a);
+
+        DEBUG_REQ(D_HA, open_req, "open req balanced");
+        LASSERT(open_req->rq_transno != 0);
+        LASSERT(open_req->rq_import == imp);
+
+        /* We no longer want to preserve this for transno-unconditional
+         * replay. */
+        spin_lock(&open_req->rq_lock);
+        open_req->rq_replay = 0;
+        spin_unlock(&open_req->rq_lock);
+}
+
+int mdc_close(struct obd_export *exp, struct obdo *obdo,
+              struct obd_client_handle *och, struct ptlrpc_request **request)
 {
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        struct obd_device *obd = class_exp2obd(exp);
+        int reqsize = sizeof(*body);
+        int rc, repsize[3] = {sizeof(*body),
+                              obd->u.cli.cl_max_mds_easize,
+                              obd->u.cli.cl_max_mds_cookiesize};
         struct ptlrpc_request *req;
+        struct mdc_open_data *mod;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_CLOSE, 1, &size,
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
                               NULL);
-        if (!req)
+        if (req == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-        ll_ino2fid(&body->fid1, ino, 0, type);
-        memcpy(&body->handle, fh, sizeof(body->handle));
-
-        req->rq_replen = lustre_msg_size(0, NULL);
+        /* Ensure that this close's handle is fixed up during replay. */
+        LASSERT(och != NULL);
+        mod = och->och_mod;
+        if (likely(mod != NULL)) {
+                mod->mod_close_req = req;
+                DEBUG_REQ(D_HA, mod->mod_open_req, "matched open req %p",
+                          mod->mod_open_req);
+        } else {
+                CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
+        }
 
-        mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
+        memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
+        body->size = obdo->o_size;
+        body->blocks = obdo->o_blocks;
+        body->flags = obdo->o_flags;
+        body->valid = obdo->o_valid;
+
+        req->rq_replen = lustre_msg_size(3, repsize);
+        req->rq_commit_cb = mdc_commit_close;
+        LASSERT(req->rq_cb_data == NULL);
+        req->rq_cb_data = mod;
+
+        mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_put_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
+
+        if (mod == NULL && rc == 0)
+                CERROR("Unexpected: can't find mdc_open_data, but the close "
+                       "succeeded.  Please tell CFS.\n");
 
         EXIT;
  out:
@@ -599,46 +455,72 @@ int mdc_close(struct lustre_handle *conn, obd_id ino, int type,
         return rc;
 }
 
-int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
+int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
+{
+        struct ptlrpc_request *req;
+        struct mds_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_DONE_WRITING, 1,
+                              &size, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
+        body->size = obdo->o_size;
+        body->blocks = obdo->o_blocks;
+        body->flags = obdo->o_flags;
+        body->valid = obdo->o_valid;
+//        memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
+
+        req->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(req);
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
+int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, __u64 offset,
                  struct page *page, struct ptlrpc_request **request)
 {
-        struct obd_import *imp = class_conn2cliimp(conn);
+        struct obd_import *imp = class_exp2cliimp(exp);
         struct ptlrpc_request *req = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct mds_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
+        CDEBUG(D_INODE, "inode: %ld\n", (long)mdc_fid->id);
 
         req = ptlrpc_prep_req(imp, MDS_READPAGE, 1, &size, NULL);
-        if (!req)
+        if (req == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* XXX FIXME bug 249 */
         req->rq_request_portal = MDS_READPAGE_PORTAL;
 
-        desc = ptlrpc_prep_bulk_imp (req, BULK_PUT_SINK, MDS_BULK_PORTAL);
-        if (desc == NULL) {
+        desc = ptlrpc_prep_bulk_imp(req, BULK_PUT_SINK, MDS_BULK_PORTAL);
+        if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
-        }
         /* NB req now owns desc and will free it when it gets freed */
 
         rc = ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
         if (rc != 0)
                 GOTO(out, rc);
 
-        mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE, ino, type);
+        mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE, mdc_fid);
 
         req->rq_replen = lustre_msg_size(1, &size);
         rc = ptlrpc_queue_wait(req);
 
         if (rc == 0) {
-                LASSERT (desc->bd_page_count == 1);
-                body = lustre_swab_repbuf (req, 0, sizeof (*body),
-                                           lustre_swab_mds_body);
+                LASSERT(desc->bd_page_count == 1);
+                body = lustre_swab_repbuf(req, 0, sizeof(*body),
+                                          lustre_swab_mds_body);
                 if (body == NULL) {
-                        CERROR ("Can't unpack mds_body\n");
-                        GOTO (out, rc = -EPROTO);
+                        CERROR("Can't unpack mds_body\n");
+                        GOTO(out, rc = -EPROTO);
                 }
         }
 
@@ -648,12 +530,13 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
         return rc;
 }
 
-static int mdc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
+static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
-        struct obd_device *obddev = class_conn2obd(conn);
+        struct obd_device *obd = exp->exp_obd;
         struct obd_ioctl_data *data = karg;
-        struct obd_import *imp = obddev->u.cli.cl_import;
+        struct obd_import *imp = obd->u.cli.cl_import;
+        struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
         
@@ -665,6 +548,20 @@ static int mdc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
                 RETURN(0);
         case IOC_OSC_SET_ACTIVE:
                 RETURN(ptlrpc_set_import_active(imp, data->ioc_offset));
+        case OBD_IOC_PARSE: {
+                ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
+                rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
+                RETURN(rc);
+        }
+#ifdef __KERNEL__
+        case OBD_IOC_LLOG_INFO:
+        case OBD_IOC_LLOG_PRINT: {
+                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+                rc = llog_ioctl(ctxt, cmd, data);
+                
+                RETURN(rc);
+        }
+#endif
         default:
                 CERROR("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
                 RETURN(-ENOTTY);
@@ -691,9 +588,9 @@ static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
 
         req->rq_replen = lustre_msg_size(1, &size);
 
-        mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_put_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
 
         if (rc)
                 GOTO(out, rc);
@@ -712,7 +609,7 @@ out:
         return rc;
 }
 
-static int mdc_pin(struct lustre_handle *conn, obd_id ino, __u32 gen, int type,
+static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type,
                    struct obd_client_handle *handle, int flag)
 {
         struct ptlrpc_request *req;
@@ -720,19 +617,19 @@ static int mdc_pin(struct lustre_handle *conn, obd_id ino, __u32 gen, int type,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_PIN, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_PIN, 1, &size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-        ll_ino2fid(&body->fid1, ino, gen, type);
+        mdc_pack_fid(&body->fid1, ino, gen, type);
         body->flags = flag;
 
         req->rq_replen = lustre_msg_size(1, &size);
 
-        mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         if (rc) {
                 CERROR("pin failed: %d\n", rc);
                 ptlrpc_req_finished(req);
@@ -746,12 +643,19 @@ static int mdc_pin(struct lustre_handle *conn, obd_id ino, __u32 gen, int type,
         }
 
         memcpy(&handle->och_fh, &body->handle, sizeof(body->handle));
-        handle->och_req = req; /* will be dropped by unpin */
         handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
+
+        OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
+        if (handle->och_mod == NULL) {
+                DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
+                RETURN(-ENOMEM);
+        }
+        handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
+
         RETURN(rc);
 }
 
-static int mdc_unpin(struct lustre_handle *conn,
+static int mdc_unpin(struct obd_export *exp,
                      struct obd_client_handle *handle, int flag)
 {
         struct ptlrpc_request *req;
@@ -762,8 +666,7 @@ static int mdc_unpin(struct lustre_handle *conn,
         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
                 RETURN(0);
 
-        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_CLOSE, 1, &size,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -772,15 +675,46 @@ static int mdc_unpin(struct lustre_handle *conn,
         body->flags = flag;
 
         req->rq_replen = lustre_msg_size(0, NULL);
-        mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
+        mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 
         if (rc != 0)
                 CERROR("unpin failed: %d\n", rc);
 
         ptlrpc_req_finished(req);
-        ptlrpc_req_finished(handle->och_req);
+        ptlrpc_req_finished(handle->och_mod->mod_open_req);
+        OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
+        RETURN(rc);
+}
+
+int mdc_sync(struct obd_export *exp, struct ll_fid *fid,
+             struct ptlrpc_request **request)
+{
+        struct ptlrpc_request *req;
+        struct mds_body *body;
+        int size = sizeof(*body);
+        int rc;
+        ENTRY;
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_SYNC, 1,&size,NULL);
+        if (!req)
+                RETURN(rc = -ENOMEM);
+
+        if (fid) {
+                body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+                memcpy(&body->fid1, fid, sizeof(*fid));
+                mdc_pack_req_body(req);
+        }
+
+        req->rq_replen = lustre_msg_size(1, &size);
+
+        rc = ptlrpc_queue_wait(req);
+        if (rc || request == NULL)
+                ptlrpc_req_finished(req);
+        else
+                *request = req;
+
         RETURN(rc);
 }
 
@@ -797,25 +731,149 @@ static int mdc_detach(struct obd_device *dev)
         return lprocfs_obd_detach(dev);
 }
 
+static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct client_obd *cli = &obd->u.cli;
+        int rc;
+        ENTRY;
+
+        OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
+        if (!cli->cl_rpc_lock)
+                RETURN(-ENOMEM);
+        mdc_init_rpc_lock(cli->cl_rpc_lock);
+        
+        OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
+        if (!cli->cl_setattr_lock)
+                GOTO(out_free_rpc, rc = -ENOMEM);
+        mdc_init_rpc_lock(cli->cl_setattr_lock);
+
+        rc = client_obd_setup(obd, len, buf);
+        if (rc) {
+                OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
+ out_free_rpc:
+                OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
+        }
+
+        RETURN(rc);
+}
+
+
+int mdc_postsetup(struct obd_device *obd) 
+{
+        int rc;
+        rc = obd_llog_init(obd, obd, 0, NULL);
+        if (rc) {
+                CERROR("failed to setup llogging subsystems\n");
+        }
+        RETURN(rc);
+}
+
+int mdc_init_ea_size(struct obd_device *obd, char *lov_name)
+{
+        struct client_obd *cli = &obd->u.cli;
+        struct obd_device *lov_obd;
+        struct obd_export *exp;
+        struct lustre_handle conn;
+        struct lov_desc desc;
+        int valsize;
+        int rc;
+
+        lov_obd = class_name2obd(lov_name);
+        if (!lov_obd) {
+                CERROR("MDC cannot locate LOV %s!\n",
+                       lov_name);
+                RETURN(-ENOTCONN);
+        }
+
+        rc = obd_connect(&conn, lov_obd, &obd->obd_uuid);
+        if (rc) {
+                CERROR("MDS cannot connect to LOV %s (%d) - no logging!\n",
+                       lov_name, rc);
+                RETURN(rc);
+        }
+        exp = class_conn2export(&conn);
+
+        valsize = sizeof(desc);
+        rc = obd_get_info(exp, strlen("lovdesc") + 1, "lovdesc", 
+                          &valsize, &desc);
+        if (rc == 0) {
+                cli->cl_max_mds_easize = obd_size_diskmd(exp, NULL);
+                cli->cl_max_mds_cookiesize = desc.ld_tgt_count * 
+                        sizeof(struct llog_cookie);
+        }
+        obd_disconnect(exp, 0);
+        RETURN(rc);
+}
+
+static int mdc_precleanup(struct obd_device *obd, int flags)
+{
+        int rc = 0;
+        
+        rc = obd_llog_finish(obd, 0);
+        if (rc != 0)
+                CERROR("failed to cleanup llogging subsystems\n");
+
+        RETURN(rc);
+}
+
+static int mdc_cleanup(struct obd_device *obd, int flags)
+{
+        struct client_obd *cli = &obd->u.cli;
+        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
+        OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
+
+        return client_obd_cleanup(obd, flags);
+}
+
+
+static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
+                         int count, struct llog_logid *logid)
+{
+        struct llog_ctxt *ctxt;
+        int rc;
+        ENTRY;
+
+        rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+                        &llog_client_ops);
+        if (rc == 0) {
+                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+                ctxt->loc_imp = obd->u.cli.cl_import; 
+        }
+
+        RETURN(rc);
+}
+
+static int mdc_llog_finish(struct obd_device *obd, int count)
+{
+        int rc;
+        ENTRY;
+
+        rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
+        RETURN(rc);
+}
+
 struct obd_ops mdc_obd_ops = {
         o_owner:       THIS_MODULE,
         o_attach:      mdc_attach,
         o_detach:      mdc_detach,
-        o_setup:       client_obd_setup,
-        o_cleanup:     client_obd_cleanup,
-        o_connect:     client_import_connect,
-        o_disconnect:  client_import_disconnect,
+        o_setup:       mdc_setup,
+        o_postsetup:   mdc_postsetup,
+        o_precleanup:  mdc_precleanup,
+        o_cleanup:     mdc_cleanup,
+        o_connect:     client_connect_import,
+        o_disconnect:  client_disconnect_export,
         o_iocontrol:   mdc_iocontrol,
         o_statfs:      mdc_statfs,
         o_pin:         mdc_pin,
         o_unpin:       mdc_unpin,
+        o_llog_init:   mdc_llog_init,
+        o_llog_finish: mdc_llog_finish,
 };
 
 int __init mdc_init(void)
 {
         struct lprocfs_static_vars lvars;
-        mdc_init_rpc_lock(&mdc_rpc_lock);
-        mdc_init_rpc_lock(&mdc_setattr_lock);
         lprocfs_init_vars(mdc, &lvars);
         return class_register_type(&mdc_obd_ops, lvars.module_vars,
                                    LUSTRE_MDC_NAME);
@@ -832,9 +890,8 @@ MODULE_DESCRIPTION("Lustre Metadata Client");
 MODULE_LICENSE("GPL");
 
 EXPORT_SYMBOL(mdc_req2lustre_md);
+EXPORT_SYMBOL(mdc_change_cbdata);
 EXPORT_SYMBOL(mdc_getstatus);
-EXPORT_SYMBOL(mdc_getlovinfo);
-EXPORT_SYMBOL(mdc_enqueue);
 EXPORT_SYMBOL(mdc_getattr);
 EXPORT_SYMBOL(mdc_getattr_name);
 EXPORT_SYMBOL(mdc_create);
@@ -844,9 +901,12 @@ EXPORT_SYMBOL(mdc_link);
 EXPORT_SYMBOL(mdc_readpage);
 EXPORT_SYMBOL(mdc_setattr);
 EXPORT_SYMBOL(mdc_close);
+EXPORT_SYMBOL(mdc_done_writing);
+EXPORT_SYMBOL(mdc_sync);
 EXPORT_SYMBOL(mdc_set_open_replay_data);
-
+EXPORT_SYMBOL(mdc_clear_open_replay_data);
 EXPORT_SYMBOL(mdc_store_inode_generation);
+EXPORT_SYMBOL(mdc_init_ea_size);
 
 module_init(mdc_init);
 module_exit(mdc_exit);