Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 342dabd..08f853e 100644 (file)
@@ -227,7 +227,8 @@ void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
 }
 
 int mdc_req2lustre_md(struct ptlrpc_request *req, int offset,
-                      struct obd_export *exp,
+                      struct obd_export *exp_osc,
+                      struct obd_export *exp_mdc,
                       struct lustre_md *md)
 {
         int rc = 0;
@@ -240,12 +241,14 @@ int mdc_req2lustre_md(struct ptlrpc_request *req, int offset,
         LASSERT (md->body != NULL);
         LASSERT_REPSWABBED (req, offset);
 
-        if (md->body->valid & OBD_MD_FLEASIZE) {
+        if (!(md->body->valid & OBD_MD_FLEASIZE))
+                RETURN(0);
+
+        /* ea is presented in reply, parse it */
+        if (S_ISREG(md->body->mode)) {
                 int lmmsize;
                 struct lov_mds_md *lmm;
 
-                LASSERT(S_ISREG(md->body->mode));
-
                 if (md->body->eadatasize == 0) {
                         CERROR ("OBD_MD_FLEASIZE set, but eadatasize 0\n");
                         RETURN(-EPROTO);
@@ -255,11 +258,31 @@ int mdc_req2lustre_md(struct ptlrpc_request *req, int offset,
                 LASSERT (lmm != NULL);
                 LASSERT_REPSWABBED (req, offset + 1);
 
-                rc = obd_unpackmd(exp, &md->lsm, lmm, lmmsize);
+                rc = obd_unpackmd(exp_osc, &md->lsm, lmm, lmmsize);
                 if (rc >= 0) {
                         LASSERT (rc >= sizeof (*md->lsm));
                         rc = 0;
                 }
+        } else if (S_ISDIR(md->body->mode)) {
+                struct mea *mea;
+                int mdsize;
+                LASSERT(exp_mdc != NULL);
+                /* dir can be non-splitted */
+                if (md->body->eadatasize == 0)
+                        RETURN(0);
+
+                mdsize = md->body->eadatasize;
+                mea = lustre_msg_buf(req->rq_repmsg, offset + 1, mdsize);
+                LASSERT(mea != NULL);
+
+                rc = obd_unpackmd(exp_mdc, (void *) &md->mea,
+                                  (void *) mea, mdsize);
+                if (rc >= 0) {
+                        LASSERT (rc >= sizeof (*md->mea));
+                        rc = 0;
+                }
+        } else {
+                LASSERT(0);
         }
         RETURN(rc);
 }
@@ -366,7 +389,7 @@ void mdc_clear_open_replay_data(struct obd_client_handle *och)
          * we're sure we won't need to fix up the close request in the future),
          * but make sure that replay doesn't poke at the och, which is about to
          * be freed. */
-        LASSERT(mod != LP_POISON);
+        LASSERT(mod != (void *)0x5a5a5a5a);
         if (mod != NULL)
                 mod->mod_och = NULL;
         och->och_mod = NULL;
@@ -388,8 +411,7 @@ static void mdc_commit_close(struct ptlrpc_request *req)
 
         open_req = mod->mod_open_req;
         LASSERT(open_req != NULL);
-        LASSERT(open_req != LP_POISON);
-        LASSERT(open_req->rq_type != LI_POISON);
+        LASSERT(open_req != (void *)0x5a5a5a5a);
 
         DEBUG_REQ(D_HA, open_req, "open req balanced");
         LASSERT(open_req->rq_transno != 0);
@@ -405,21 +427,18 @@ static void mdc_commit_close(struct ptlrpc_request *req)
 static int mdc_close_interpret(struct ptlrpc_request *req, void *data, int rc)
 {
         union ptlrpc_async_args *aa = data;
-        struct mdc_rpc_lock *rpc_lock;
+        struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0];
         struct obd_device *obd = aa->pointer_arg[1];
-        unsigned long flags;
-
-        spin_lock_irqsave(&req->rq_lock, flags);
-        rpc_lock = aa->pointer_arg[0];
-        aa->pointer_arg[0] = NULL;
-        spin_unlock_irqrestore (&req->rq_lock, flags);
 
         if (rpc_lock == NULL) {
                 CERROR("called with NULL rpc_lock\n");
         } else {
                 mdc_put_rpc_lock(rpc_lock, NULL);
-                LASSERTF(rpc_lock == obd->u.cli.cl_rpc_lock, "%p != %p\n",
-                         rpc_lock, obd->u.cli.cl_rpc_lock);
+                LASSERTF(req->rq_async_args.pointer_arg[0] ==
+                         obd->u.cli.cl_rpc_lock, "%p != %p\n",
+                         req->rq_async_args.pointer_arg[0],
+                         obd->u.cli.cl_rpc_lock);
+                aa->pointer_arg[0] = NULL;
         }
         wake_up(&req->rq_reply_waitq);
         RETURN(rc);
@@ -433,8 +452,9 @@ static int mdc_close_check_reply(struct ptlrpc_request *req)
         unsigned long flags;
 
         spin_lock_irqsave(&req->rq_lock, flags);
-        if (req->rq_async_args.pointer_arg[0] == NULL)
+        if (PTLRPC_REQUEST_COMPLETE(req)) {
                 rc = 1;
+        }
         spin_unlock_irqrestore (&req->rq_lock, flags);
         return rc;
 }
@@ -444,12 +464,13 @@ static int go_back_to_sleep(void *unused)
         return 0;
 }
 
-int mdc_close(struct obd_export *exp, struct obdo *oa,
+int mdc_close(struct obd_export *exp, struct obdo *obdo,
               struct obd_client_handle *och, struct ptlrpc_request **request)
 {
+        struct mds_body *body;
         struct obd_device *obd = class_exp2obd(exp);
-        int reqsize = sizeof(struct mds_body);
-        int rc, repsize[3] = {sizeof(struct mds_body),
+        int reqsize = sizeof(*body);
+        int rc, repsize[3] = {sizeof(*body),
                               obd->u.cli.cl_max_mds_easize,
                               obd->u.cli.cl_max_mds_cookiesize};
         struct ptlrpc_request *req;
@@ -467,14 +488,19 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
         mod = och->och_mod;
         if (likely(mod != NULL)) {
                 mod->mod_close_req = req;
-                LASSERT(mod->mod_open_req->rq_type != LI_POISON);
                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched open req %p",
                           mod->mod_open_req);
         } else {
                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
         }
 
-        mdc_close_pack(req, 0, oa, oa->o_valid, och);
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
+        memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
+        body->size = obdo->o_size;
+        body->blocks = obdo->o_blocks;
+        body->flags = obdo->o_flags;
+        body->valid = obdo->o_valid;
 
         req->rq_replen = lustre_msg_size(3, repsize);
         req->rq_commit_cb = mdc_commit_close;
@@ -493,12 +519,7 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
                                NULL, NULL);
         rc = l_wait_event(req->rq_reply_waitq, mdc_close_check_reply(req),
                           &lwi);
-        if (req->rq_repmsg == NULL) {
-                CDEBUG(D_HA, "request failed to send: %p, %d\n", req,
-                       req->rq_status);
-                if (rc == 0)
-                        rc = req->rq_status ? req->rq_status : -EIO;
-        } else if (rc == 0) {
+        if (rc == 0) {
                 rc = req->rq_repmsg->status;
                 if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
                         DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
@@ -554,8 +575,9 @@ int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
         RETURN(rc);
 }
 
-int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, __u64 offset,
-                 struct page *page, struct ptlrpc_request **request)
+int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
+                 __u64 offset, struct page *page,
+                 struct ptlrpc_request **request)
 {
         struct obd_import *imp = class_exp2cliimp(exp);
         struct ptlrpc_request *req = NULL;
@@ -616,15 +638,9 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
-
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+        
         MOD_INC_USE_COUNT;
-#else
-       if (!try_module_get(THIS_MODULE)) {
-               CERROR("Can't get module. Is it alive?");
-               return -EINVAL;
-       }
-#endif
+
         switch (cmd) {
         case OBD_IOC_CLIENT_RECOVER:
                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
@@ -635,30 +651,26 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
                 GOTO(out, rc);
         case OBD_IOC_PARSE: {
-                ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
+                ctxt = llog_get_context(&exp->exp_obd->obd_llogs,
+                                        LLOG_CONFIG_REPL_CTXT);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
                 GOTO(out, rc);
         }
 #ifdef __KERNEL__
         case OBD_IOC_LLOG_INFO:
         case OBD_IOC_LLOG_PRINT: {
-                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+                ctxt = llog_get_context(&obd->obd_llogs, LLOG_CONFIG_REPL_CTXT);
                 rc = llog_ioctl(ctxt, cmd, data);
                 
                 GOTO(out, rc);
         }
 #endif
         default:
-                CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
+                CERROR("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
                 GOTO(out, rc = -ENOTTY);
         }
 out:
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
         MOD_DEC_USE_COUNT;
-#else
-       module_put(THIS_MODULE);
-#endif
-
         return rc;
 }
 
@@ -677,6 +689,19 @@ int mdc_set_info(struct obd_export *exp, obd_count keylen,
                        exp->exp_obd->obd_name,
                        imp->imp_initial_recov);
                 RETURN(0);
+        } else if (keylen >= strlen("client") && strcmp(key, "client") == 0) {
+                struct ptlrpc_request *req;
+                char *bufs[1] = {key};
+                int rc;
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SET_INFO, 1,
+                                      &keylen, bufs);
+                if (req == NULL)
+                        RETURN(-ENOMEM);
+
+                req->rq_replen = lustre_msg_size(0, NULL);
+                rc = ptlrpc_queue_wait(req);
+                ptlrpc_req_finished(req);
+                RETURN(rc);
         }
         
         RETURN(rc);
@@ -832,6 +857,19 @@ int mdc_sync(struct obd_export *exp, struct ll_fid *fid,
         RETURN(rc);
 }
 
+static int mdc_attach(struct obd_device *dev, obd_count len, void *data)
+{
+        struct lprocfs_static_vars lvars;
+
+        lprocfs_init_vars(mdc, &lvars);
+        return lprocfs_obd_attach(dev, lvars.obd_vars);
+}
+
+static int mdc_detach(struct obd_device *dev)
+{
+        return lprocfs_obd_detach(dev);
+}
+
 static int mdc_import_event(struct obd_device *obd,
                             struct obd_import *imp, 
                             enum obd_import_event event)
@@ -851,7 +889,7 @@ static int mdc_import_event(struct obd_device *obd,
         }
         case IMP_EVENT_INVALIDATE: {
                 struct ldlm_namespace *ns = obd->obd_namespace;
-
+                
                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
 
                 break;
@@ -871,7 +909,6 @@ static int mdc_import_event(struct obd_device *obd,
 static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         struct client_obd *cli = &obd->u.cli;
-        struct lprocfs_static_vars lvars;
         int rc;
         ENTRY;
 
@@ -884,71 +921,43 @@ static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
 
         OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
         if (!cli->cl_setattr_lock)
-                GOTO(err_rpc_lock, rc = -ENOMEM);
+                GOTO(out_free_rpc, rc = -ENOMEM);
         mdc_init_rpc_lock(cli->cl_setattr_lock);
 
         rc = client_obd_setup(obd, len, buf);
-        if (rc)
-                GOTO(err_setattr_lock, rc);
-        lprocfs_init_vars(mdc, &lvars);
-        lprocfs_obd_setup(obd, lvars.obd_vars);
+        if (rc) {
+                OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
+ out_free_rpc:
+                OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
+        }
 
-        rc = obd_llog_init(obd, obd, 0, NULL);
+        rc = obd_llog_init(obd, &obd->obd_llogs, obd, 0, NULL);
         if (rc) {
                 mdc_cleanup(obd, 0);
                 CERROR("failed to setup llogging subsystems\n");
         }
 
         RETURN(rc);
-
-err_setattr_lock:
-        OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
-err_rpc_lock:
-        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
-        ptlrpcd_decref();
-        RETURN(rc);
 }
 
-int mdc_init_ea_size(struct obd_device *obd, char *lov_name)
+int mdc_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
 {
+        struct obd_device *obd = exp->exp_obd;
         struct client_obd *cli = &obd->u.cli;
-        struct obd_device *lov_obd;
-        struct obd_export *exp;
-        struct lustre_handle conn;
-        struct lov_desc desc;
-        int valsize;
-        int rc;
-
-        lov_obd = class_name2obd(lov_name);
-        if (!lov_obd) {
-                CERROR("MDC cannot locate LOV %s!\n", lov_name);
-                RETURN(-ENOTCONN);
-        }
-
-        rc = obd_connect(&conn, lov_obd, &obd->obd_uuid);
-        if (rc) {
-                CERROR("MDC failed connect to LOV %s (%d)\n", lov_name, rc);
-                RETURN(rc);
-        }
-        exp = class_conn2export(&conn);
+        ENTRY;
 
-        valsize = sizeof(desc);
-        rc = obd_get_info(exp, strlen("lovdesc") + 1, "lovdesc",
-                          &valsize, &desc);
-        if (rc == 0) {
-                cli->cl_max_mds_easize = obd_size_diskmd(exp, NULL);
-                cli->cl_max_mds_cookiesize = desc.ld_tgt_count *
-                        sizeof(struct llog_cookie);
-        }
-        obd_disconnect(exp, 0);
-        RETURN(rc);
+        if (cli->cl_max_mds_easize < easize)
+                cli->cl_max_mds_easize = easize;
+        if (cli->cl_max_mds_cookiesize < cookiesize)
+                cli->cl_max_mds_cookiesize = cookiesize;
+        RETURN(0);
 }
 
 static int mdc_precleanup(struct obd_device *obd, int flags)
 {
         int rc = 0;
-
-        rc = obd_llog_finish(obd, 0);
+        
+        rc = obd_llog_finish(obd, &obd->obd_llogs, 0);
         if (rc != 0)
                 CERROR("failed to cleanup llogging subsystems\n");
 
@@ -958,74 +967,272 @@ static int mdc_precleanup(struct obd_device *obd, int flags)
 static int mdc_cleanup(struct obd_device *obd, int flags)
 {
         struct client_obd *cli = &obd->u.cli;
-
         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
 
-        lprocfs_obd_cleanup(obd);
         ptlrpcd_decref();
 
         return client_obd_cleanup(obd, flags);
 }
 
 
-static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_catid *logid)
+static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                         struct obd_device *tgt, int count,
+                         struct llog_catid *logid)
 {
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
-        rc = obd_llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
-                            &llog_client_ops);
+        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+                        &llog_client_ops);
         if (rc == 0) {
-                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+                ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT);
                 ctxt->loc_imp = obd->u.cli.cl_import;
         }
 
         RETURN(rc);
 }
 
-static int mdc_llog_finish(struct obd_device *obd, int count)
+static int mdc_llog_finish(struct obd_device *obd,
+                           struct obd_llogs *llogs, int count)
 {
         int rc;
         ENTRY;
 
-        rc = obd_llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
+        rc = llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT));
         RETURN(rc);
 }
 
+int mdc_obj_create(struct obd_export *exp, struct obdo *oa,
+                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        struct ptlrpc_request *request;
+        struct ost_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        LASSERT(oa);
+
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_CREATE,
+                                  1, &size, NULL);
+        if (!request)
+                GOTO(out_req, rc = -ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
+        memcpy(&body->oa, oa, sizeof(body->oa));
+
+        request->rq_replen = lustre_msg_size(1, &size);
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out_req, rc);
+
+        body = lustre_swab_repbuf(request, 0, sizeof(*body),
+                                  lustre_swab_ost_body);
+        if (body == NULL) {
+                CERROR ("can't unpack ost_body\n");
+                GOTO (out_req, rc = -EPROTO);
+        }
+
+        memcpy(oa, &body->oa, sizeof(*oa));
+
+        CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
+        EXIT;
+out_req:
+        ptlrpc_req_finished(request);
+        return rc;
+}
+
+static int mdc_get_info(struct obd_export *exp, obd_count keylen,
+                        void *key, __u32 *vallen, void *val)
+{
+        ENTRY;
+        if (!vallen || !val)
+                RETURN(-EFAULT);
+
+        if (keylen >= strlen("mdsize") && strcmp(key, "mdsize") == 0) {
+                struct ptlrpc_request *req;
+                __u32 *reply;
+                char *bufs[1] = {key};
+                int rc;
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GET_INFO, 1,
+                                      &keylen, bufs);
+                if (req == NULL)
+                        RETURN(-ENOMEM);
+
+                req->rq_replen = lustre_msg_size(1, vallen);
+                rc = ptlrpc_queue_wait(req);
+                if (rc)
+                        GOTO(out, rc);
+
+                /* FIXME: right swap routine here! */
+                reply = lustre_swab_repbuf(req, 0, sizeof(*reply), NULL);
+                if (reply == NULL) {
+                        CERROR("Can't unpack mdsize\n");
+                        GOTO(out, rc = -EPROTO);
+                }
+                *((__u32 *)val) = *reply;
+        out:
+                ptlrpc_req_finished(req);
+                RETURN(rc);
+        }
+        RETURN(-EINVAL);
+}
+
+int mdc_brw(int rw, struct obd_export *exp, struct obdo *oa,
+                struct lov_stripe_md *ea, obd_count oa_bufs,
+                struct brw_page *pgarr, struct obd_trans_info *oti)
+{
+        struct ptlrpc_bulk_desc *desc;
+        struct niobuf_remote *niobuf;
+        struct ptlrpc_request *req;
+        struct obd_ioobj *ioobj;
+        struct ost_body *body;
+        int err, opc, i;
+        int size[3];
+
+        opc = ((rw & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
+        
+        size[0] = sizeof(*body);
+        size[1] = sizeof(*ioobj);
+        size[2] = oa_bufs * sizeof(*niobuf);
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), opc, 3, size, NULL);
+        LASSERT(req != NULL);
+
+        if (opc == OST_WRITE)
+                desc = ptlrpc_prep_bulk_imp(req, oa_bufs, BULK_GET_SOURCE,
+                                            OST_BULK_PORTAL);
+        else
+                desc = ptlrpc_prep_bulk_imp(req, oa_bufs, BULK_PUT_SINK,
+                                            OST_BULK_PORTAL);
+        LASSERT(desc != NULL);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
+        niobuf = lustre_msg_buf(req->rq_reqmsg, 2, oa_bufs * sizeof(*niobuf));
+
+        memcpy(&body->oa, oa, sizeof(*oa));
+        obdo_to_ioobj(oa, ioobj);
+        ioobj->ioo_bufcnt = oa_bufs;
+
+        for (i = 0; i < oa_bufs; i++, niobuf++) {
+                struct brw_page *pg = &pgarr[i];
+
+                LASSERT(pg->count > 0);
+                LASSERT((pg->off & ~PAGE_MASK) + pg->count <= PAGE_SIZE);
+
+                ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~PAGE_MASK,
+                                      pg->count);
+
+                niobuf->offset = pg->off;
+                niobuf->len = pg->count;
+                niobuf->flags = pg->flag;
+        }
+
+        /* size[0] still sizeof (*body) */
+        if (opc == OST_WRITE) {
+                /* 1 RC per niobuf */
+                size[1] = sizeof(__u32) * oa_bufs;
+                req->rq_replen = lustre_msg_size(2, size);
+        } else {
+                /* 1 RC for the whole I/O */
+                req->rq_replen = lustre_msg_size(1, size);
+        }
+        err = ptlrpc_queue_wait(req);
+        LASSERT(err == 0);
+
+        ptlrpc_req_finished(req);
+        return 0;
+}
+
+static int mdc_valid_attrs(struct obd_export *exp, struct ll_fid *fid)
+{
+        struct ldlm_res_id res_id = { .name = {0} };
+        struct obd_device *obd = exp->exp_obd;
+        struct lustre_handle lockh;
+        ldlm_policy_data_t policy;
+        int flags;
+        ENTRY;
+
+        res_id.name[0] = fid->id;
+        res_id.name[1] = fid->generation;
+        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
+        CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
+
+        /* FIXME use LDLM_FL_TEST_LOCK instead */
+        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+        if (ldlm_lock_match(obd->obd_namespace, flags, &res_id,
+                            LDLM_IBITS, &policy, LCK_PR, &lockh)) {
+                ldlm_lock_decref(&lockh, LCK_PR);
+                RETURN(1);
+        }
+
+        if (ldlm_lock_match(obd->obd_namespace, flags, &res_id,
+                            LDLM_IBITS, &policy, LCK_PW, &lockh)) {
+                ldlm_lock_decref(&lockh, LCK_PW);
+                RETURN(1);
+        }
+        RETURN(0);
+}
+
 struct obd_ops mdc_obd_ops = {
-        .o_owner        = THIS_MODULE,
-        .o_setup        = mdc_setup,
-        .o_precleanup   = mdc_precleanup,
-        .o_cleanup      = mdc_cleanup,
-        .o_connect      = client_connect_import,
-        .o_disconnect   = client_disconnect_export,
-        .o_iocontrol    = mdc_iocontrol,
-        .o_set_info     = mdc_set_info,
-        .o_statfs       = mdc_statfs,
-        .o_pin          = mdc_pin,
-        .o_unpin        = mdc_unpin,
-        .o_import_event = mdc_import_event,
-        .o_llog_init    = mdc_llog_init,
-        .o_llog_finish  = mdc_llog_finish,
+        o_owner:        THIS_MODULE,
+        o_attach:       mdc_attach,
+        o_detach:       mdc_detach,
+        o_setup:        mdc_setup,
+        o_precleanup:   mdc_precleanup,
+        o_cleanup:      mdc_cleanup,
+        o_connect:      client_connect_import,
+        o_disconnect:   client_disconnect_export,
+        o_iocontrol:    mdc_iocontrol,
+        o_set_info:     mdc_set_info,
+        o_statfs:       mdc_statfs,
+        o_pin:          mdc_pin,
+        o_unpin:        mdc_unpin,
+        o_llog_init:    mdc_llog_init,
+        o_llog_finish:  mdc_llog_finish,
+        o_create:       mdc_obj_create,
+        o_get_info:     mdc_get_info,
+        o_brw:          mdc_brw,
+        o_init_ea_size: mdc_init_ea_size,
+};
+
+struct md_ops mdc_md_ops = {
+        m_getstatus:     mdc_getstatus,
+        m_getattr:       mdc_getattr,
+        m_change_cbdata: mdc_change_cbdata,
+        m_close:         mdc_close,
+        m_create:        mdc_create,
+        m_done_writing:  mdc_done_writing,
+        m_enqueue:       mdc_enqueue,
+        m_getattr_name:  mdc_getattr_name,
+        m_intent_lock:   mdc_intent_lock,
+        m_link:          mdc_link,
+        m_rename:        mdc_rename,
+        m_setattr:       mdc_setattr,
+        m_sync:          mdc_sync,
+        m_readpage:      mdc_readpage,
+        m_unlink:        mdc_unlink,
+        m_valid_attrs:   mdc_valid_attrs,
 };
 
 int __init mdc_init(void)
 {
         struct lprocfs_static_vars lvars;
         lprocfs_init_vars(mdc, &lvars);
-        return class_register_type(&mdc_obd_ops, lvars.module_vars,
+        return class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars,
                                    LUSTRE_MDC_NAME);
 }
 
-#ifdef __KERNEL__
 static void /*__exit*/ mdc_exit(void)
 {
         class_unregister_type(LUSTRE_MDC_NAME);
 }
 
+#ifdef __KERNEL__
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Metadata Client");
 MODULE_LICENSE("GPL");
@@ -1047,7 +1254,6 @@ EXPORT_SYMBOL(mdc_sync);
 EXPORT_SYMBOL(mdc_set_open_replay_data);
 EXPORT_SYMBOL(mdc_clear_open_replay_data);
 EXPORT_SYMBOL(mdc_store_inode_generation);
-EXPORT_SYMBOL(mdc_init_ea_size);
 
 module_init(mdc_init);
 module_exit(mdc_exit);