Whamcloud - gitweb
Land first part of new dcache handling (bug 16654).
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index 5417c67..643f5b3 100644 (file)
@@ -127,7 +127,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
                 GOTO(out_lmv_lock, rc);
         }
 
-        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, 
+        CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
                activate ? "" : "in");
         lmv_activate_target(lmv, tgt, activate);
         EXIT;
@@ -199,7 +199,7 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
         } else if (ev == OBD_NOTIFY_OCD) {
                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
 
-                /* 
+                /*
                  * Set connect data to desired target, update exp_connect_flags.
                  */
                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
@@ -219,14 +219,14 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
         }
 #if 0
         else if (ev == OBD_NOTIFY_DISCON) {
-                /* 
-                 * For disconnect event, flush fld cache for failout MDS case. 
+                /*
+                 * For disconnect event, flush fld cache for failout MDS case.
                  */
                 fld_client_flush(&lmv->lmv_fld);
         }
 #endif
-        /* 
-         * Pass the notification up the chain. 
+        /*
+         * Pass the notification up the chain.
          */
         if (obd->obd_observer)
                 rc = obd_notify(obd->obd_observer, watched, ev, data);
@@ -236,10 +236,10 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
 
 /**
  * This is fake connect function. Its purpose is to initialize lmv and say
- * caller that everything is okay. Real connection will be performed later. 
+ * caller that everything is okay. Real connection will be performed later.
  */
 static int lmv_connect(const struct lu_env *env,
-                       struct lustre_handle *conn, struct obd_device *obd,
+                       struct obd_export **exp, struct obd_device *obd,
                        struct obd_uuid *cluuid, struct obd_connect_data *data,
                        void *localdata)
 {
@@ -247,29 +247,30 @@ static int lmv_connect(const struct lu_env *env,
         struct proc_dir_entry *lmv_proc_dir;
 #endif
         struct lmv_obd        *lmv = &obd->u.lmv;
-        struct obd_export     *exp;
+        struct lustre_handle  conn = { 0 };
         int                    rc = 0;
         ENTRY;
 
-        rc = class_connect(conn, obd, cluuid);
-        if (rc) {
-                CERROR("class_connection() returned %d\n", rc);
-                RETURN(rc);
-        }
-
-        exp = class_conn2export(conn);
-
-        /* 
+        /*
          * We don't want to actually do the underlying connections more than
-         * once, so keep track. 
+         * once, so keep track.
          */
         lmv->refcount++;
         if (lmv->refcount > 1) {
-                class_export_put(exp);
+                *exp = NULL;
                 RETURN(0);
         }
 
-        lmv->exp = exp;
+        rc = class_connect(&conn, obd, cluuid);
+        if (rc) {
+                CERROR("class_connection() returned %d\n", rc);
+                RETURN(rc);
+        }
+
+        *exp = class_conn2export(&conn);
+        class_export_get(*exp);
+
+        lmv->exp = *exp;
         lmv->connected = 0;
         lmv->cluuid = *cluuid;
 
@@ -286,11 +287,11 @@ static int lmv_connect(const struct lu_env *env,
         }
 #endif
 
-        /* 
+        /*
          * All real clients should perform actual connection right away, because
          * it is possible, that LMV will not have opportunity to connect targets
          * and MDC stuff will be called directly, for instance while reading
-         * ../mdc/../kbytesfree procfs file, etc. 
+         * ../mdc/../kbytesfree procfs file, etc.
          */
         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
                 rc = lmv_check_connect(obd);
@@ -383,7 +384,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         struct obd_uuid         *cluuid = &lmv->cluuid;
         struct obd_connect_data *mdc_data = NULL;
         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
-        struct lustre_handle     conn = {0, };
         struct obd_device       *mdc_obd;
         struct obd_export       *mdc_exp;
         struct lu_fld_target     target;
@@ -407,16 +407,14 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
                 RETURN(-EINVAL);
         }
 
-        rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
+        rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
                          &lmv->conn_data, NULL);
         if (rc) {
                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
                 RETURN(rc);
         }
 
-        mdc_exp = class_conn2export(&conn);
-
-        /* 
+        /*
          * Init fid sequence client for this mdc and add new fld target.
          */
         rc = obd_fid_init(mdc_exp);
@@ -440,7 +438,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         }
 
         if (obd->obd_observer) {
-                /* 
+                /*
                  * Tell the observer about the new target.
                  */
                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
@@ -455,7 +453,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         tgt->ltd_exp = mdc_exp;
         lmv->desc.ld_active_tgt_count++;
 
-        /* 
+        /*
          * Copy connect data, it may be used later.
          */
         lmv->datas[tgt->ltd_idx] = *mdc_data;
@@ -684,8 +682,8 @@ static int lmv_disconnect(struct obd_export *exp)
         if (!lmv->tgts)
                 goto out_local;
 
-        /* 
-         * Only disconnect the underlying layers on the final disconnect. 
+        /*
+         * Only disconnect the underlying layers on the final disconnect.
          */
         lmv->refcount--;
         if (lmv->refcount != 0)
@@ -725,12 +723,13 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
 {
         struct obd_device    *obddev = class_exp2obd(exp);
         struct lmv_obd       *lmv = &obddev->u.lmv;
-        int                   i;
+        int                   i = 0;
         int                   rc = 0;
         int                   set = 0;
+        int                   count = lmv->desc.ld_tgt_count;
         ENTRY;
 
-        if (lmv->desc.ld_tgt_count == 0)
+        if (count == 0)
                 RETURN(-ENOTTY);
 
         switch (cmd) {
@@ -743,7 +742,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
                 LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs));
 
-                if ((index >= lmv->desc.ld_tgt_count))
+                if ((index >= count))
                         RETURN(-ENODEV);
 
                 if (!lmv->tgts[index].ltd_active)
@@ -758,13 +757,71 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (rc)
                         RETURN(rc);
                 if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
-                        RETURN(rc);
-                rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
-                                  data->ioc_plen2);
+                        RETURN(-EFAULT);
+                if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
+                                  data->ioc_plen2))
+                        RETURN(-EFAULT);
+                break;
+        }
+        case OBD_IOC_QUOTACTL: {
+                struct if_quotactl *qctl = karg;
+                struct lmv_tgt_desc *tgt = NULL;
+                struct obd_quotactl *oqctl;
+
+                if (qctl->qc_valid == QC_MDTIDX) {
+                        if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
+                                RETURN(-EINVAL);
+
+                        tgt = &lmv->tgts[qctl->qc_idx];
+                        if (!tgt->ltd_exp)
+                                RETURN(-EINVAL);
+                } else if (qctl->qc_valid == QC_UUID) {
+                        for (i = 0; i < count; i++) {
+                                tgt = &lmv->tgts[i];
+                                if (!obd_uuid_equals(&tgt->ltd_uuid,
+                                                     &qctl->obd_uuid))
+                                        continue;
+
+                                if (tgt->ltd_exp == NULL)
+                                        RETURN(-EINVAL);
+
+                                break;
+                        }
+                } else {
+                        RETURN(-EINVAL);
+                }
+
+                if (i >= count)
+                        RETURN(-EAGAIN);
+
+                LASSERT(tgt && tgt->ltd_exp);
+                OBD_ALLOC_PTR(oqctl);
+                if (!oqctl)
+                        RETURN(-ENOMEM);
+
+                QCTL_COPY(oqctl, qctl);
+                rc = obd_quotactl(tgt->ltd_exp, oqctl);
+                if (rc == 0) {
+                        QCTL_COPY(qctl, oqctl);
+                        qctl->qc_valid = QC_MDTIDX;
+                        qctl->obd_uuid = tgt->ltd_uuid;
+                }
+                OBD_FREE_PTR(oqctl);
+                break;
+        }
+        case OBD_IOC_CHANGELOG_CLEAR: {
+                struct ioc_changelog_clear *icc = karg;
+
+                if (icc->icc_mdtindex >= count)
+                        RETURN(-ENODEV);
+
+                rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex].ltd_exp,
+                                   sizeof(*icc), icc, NULL);
                 break;
         }
+
         default : {
-                for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                for (i = 0; i < count; i++) {
                         int err;
 
                         if (lmv->tgts[i].ltd_exp == NULL)
@@ -772,7 +829,9 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
 
                         err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len,
                                             karg, uarg);
-                        if (err) {
+                        if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
+                                RETURN(err);
+                        } else if (err) {
                                 if (lmv->tgts[i].ltd_active) {
                                         CERROR("error: iocontrol MDC %s on MDT"
                                                "idx %d cmd %x: err = %d\n",
@@ -806,7 +865,7 @@ static int lmv_nid_policy(struct lmv_obd *lmv)
 {
         struct obd_import *imp;
         __u32              id;
-        
+
         /*
          * XXX: To get nid we assume that underlying obd device is mdc.
          */
@@ -835,7 +894,7 @@ static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
 }
 
 /**
- * This is _inode_ placement policy function (not name). 
+ * This is _inode_ placement policy function (not name).
  */
 static int lmv_placement_policy(struct obd_device *obd,
                                 struct md_op_data *op_data,
@@ -912,8 +971,8 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
         ENTRY;
 
         tgt = lmv_get_target(lmv, mds);
-    
-        /* 
+
+        /*
          * New seq alloc and FLD setup should be atomic. Otherwise we may find
          * on server that seq in new allocated fid is not yet known.
          */
@@ -922,26 +981,13 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
         if (!tgt->ltd_active)
                 GOTO(out, rc = -ENODEV);
 
-        /* 
-         * Asking underlaying tgt layer to allocate new fid. 
+        /*
+         * Asking underlaying tgt layer to allocate new fid.
          */
         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
         if (rc > 0) {
                 LASSERT(fid_is_sane(fid));
-
-                /* 
-                 * Client switches to new sequence, setup FLD. 
-                 */
-                rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
-                                       mds, NULL);
-                if (rc) {
-                        /* 
-                         * Delete just allocated fid sequence in case
-                         * of fail back.
-                         */
-                        CERROR("Can't create fld entry, rc %d\n", rc);
-                        obd_fid_delete(tgt->ltd_exp, NULL);
-                }
+                rc = 0;
         }
 
         EXIT;
@@ -1217,7 +1263,7 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input, 
+        rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
                          input_size, output_size, flags, suppgid,
                          request);
 
@@ -1279,7 +1325,7 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
                                 continue;
                         }
 
-                        /* 
+                        /*
                          * Skip master object.
                          */
                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid))
@@ -1368,8 +1414,8 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
-        /* 
-         * Time to update mea of parent fid. 
+        /*
+         * Time to update mea of parent fid.
          */
         rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req);
         if (rc) {
@@ -1448,7 +1494,7 @@ repeat:
         else if (rc)
                 RETURN(rc);
 
-        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n", 
+        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
                op_data->op_mds);
 
@@ -1559,8 +1605,8 @@ cleanup:
         OBD_FREE_PTR(op_data2);
 
         if (rc != 0) {
-                /* 
-                 * Drop all taken locks. 
+                /*
+                 * Drop all taken locks.
                  */
                 while (--i >= 0) {
                         if (lockh[i].cookie)
@@ -1598,8 +1644,8 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
 
-        /* 
-         * We got LOOKUP lock, but we really need attrs. 
+        /*
+         * We got LOOKUP lock, but we really need attrs.
          */
         pmode = it->d.lustre.it_lock_mode;
         LASSERT(pmode != 0);
@@ -1677,7 +1723,7 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
-        
+
         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
                         lmm, lmmsize, req, extra_lock_flags);
 
@@ -1802,7 +1848,7 @@ static int lmv_early_cancel_slaves(struct obd_export *exp,
         obj = lmv_object_find(obd, op_fid);
         if (obj == NULL)
                 RETURN(-EALREADY);
-                
+
         policy.l_inodebits.bits = bits;
         for (i = 0; i < obj->lo_objcount; i++) {
                 tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
@@ -1810,12 +1856,12 @@ static int lmv_early_cancel_slaves(struct obd_export *exp,
                 if (op_tgt != tgt->ltd_idx) {
                         CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n",
                                PFID(st_fid), tgt->ltd_idx);
-                        rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy, 
+                        rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy,
                                               mode, LDLM_FL_ASYNC, NULL);
                         if (rc)
                                 GOTO(out_put_obj, rc);
                 } else {
-                        CDEBUG(D_INODE, 
+                        CDEBUG(D_INODE,
                                "EARLY_CANCEL skip operation target %d on "DFID"\n",
                                op_tgt, PFID(st_fid));
                         /*
@@ -1864,7 +1910,7 @@ static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
                         rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
                                               mode, LDLM_FL_ASYNC, NULL);
                 } else {
-                        CDEBUG(D_INODE, 
+                        CDEBUG(D_INODE,
                                "EARLY_CANCEL skip operation target %d on "DFID"\n",
                                op_tgt, PFID(fid));
                         op_data->op_flags |= flag;
@@ -1899,38 +1945,28 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
 repeat:
         ++loop;
         LASSERT(loop <= 2);
-        if (op_data->op_namelen != 0) {
-                obj = lmv_object_find(obd, &op_data->op_fid2);
-                if (obj) {
-                        sidx = raw_name2idx(obj->lo_hashtype,
-                                               obj->lo_objcount,
-                                               op_data->op_name,
-                                               op_data->op_namelen);
-                        op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
-                        mds = obj->lo_stripes[sidx].ls_mds;
-                        lmv_object_put(obj);
-                } else {
-                        rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
-                        if (rc)
-                                RETURN(rc);
-                }
+        LASSERT(op_data->op_namelen != 0);
 
-                CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
-                       PFID(&op_data->op_fid2), op_data->op_namelen,
-                       op_data->op_name, PFID(&op_data->op_fid1));
+        CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
+               PFID(&op_data->op_fid2), op_data->op_namelen,
+               op_data->op_name, PFID(&op_data->op_fid1));
+
+        obj = lmv_object_find(obd, &op_data->op_fid2);
+        if (obj) {
+                sidx = raw_name2idx(obj->lo_hashtype,
+                                    obj->lo_objcount,
+                                    op_data->op_name,
+                                    op_data->op_namelen);
+                op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
+                mds = obj->lo_stripes[sidx].ls_mds;
+                lmv_object_put(obj);
         } else {
-                rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
+                rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
                 if (rc)
                         RETURN(rc);
-
-                /* 
-                 * Request from MDS to acquire i_links for inode by fid1. 
-                 */
-                CDEBUG(D_INODE, "Inc i_nlinks for "DFID"\n",
-                       PFID(&op_data->op_fid1));
         }
 
-        CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n",
+        CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n",
                mds, PFID(&op_data->op_fid1));
 
         op_data->op_fsuid = current->fsuid;
@@ -1938,14 +1974,12 @@ repeat:
         op_data->op_cap = cfs_curproc_cap_pack();
         tgt = lmv_get_target(lmv, mds);
 
-        if (op_data->op_namelen) {
-                /* 
-                 * Cancel UPDATE lock on child (fid1). 
-                 */
-                op_data->op_flags |= MF_MDC_CANCEL_FID2;
-                rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
-                                      MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
-        }
+        /*
+         * Cancel UPDATE lock on child (fid1).
+         */
+        op_data->op_flags |= MF_MDC_CANCEL_FID2;
+        rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+                              MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
         if (rc == 0)
                 rc = md_link(tgt->ltd_exp, op_data, request);
         if (rc == -ERESTART) {
@@ -1983,6 +2017,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         mdsno_t                  mds2;
         ENTRY;
 
+        LASSERT(oldlen != 0);
+
         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
                oldlen, old, PFID(&op_data->op_fid1),
                newlen, new, PFID(&op_data->op_fid2));
@@ -1991,37 +2027,6 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         if (rc)
                 RETURN(rc);
 
-        if (oldlen == 0) {
-                /*
-                 * MDS with old dir entry is asking another MDS to create name
-                 * there.
-                 */
-                CDEBUG(D_INODE,
-                       "Create %*s(%d/%d) in "DFID" pointing "
-                       "to "DFID"\n", newlen, new, oldlen, newlen,
-                       PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
-
-                rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds1);
-                if (rc)
-                        RETURN(rc);
-
-                /*
-                 * Target directory can be split, sowe should forward request to
-                 * the right MDS.
-                 */
-                obj = lmv_object_find(obd, &op_data->op_fid2);
-                if (obj) {
-                        sidx = raw_name2idx(obj->lo_hashtype,
-                                            obj->lo_objcount,
-                                            (char *)new, newlen);
-                        op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
-                        CDEBUG(D_INODE, "Parent obj "DFID"\n",
-                               PFID(&op_data->op_fid2));
-                        lmv_object_put(obj);
-                }
-                goto request;
-        }
-
 repeat:
         ++loop;
         LASSERT(loop <= 2);
@@ -2058,52 +2063,51 @@ repeat:
                         RETURN(rc);
         }
 
-request:
         op_data->op_fsuid = current->fsuid;
         op_data->op_fsgid = current->fsgid;
         op_data->op_cap = cfs_curproc_cap_pack();
 
         src_tgt = lmv_get_target(lmv, mds1);
         tgt_tgt = lmv_get_target(lmv, mds2);
-        if (oldlen) {
-                /* 
-                 * LOOKUP lock on src child (fid3) should also be cancelled for
-                 * src_tgt in mdc_rename. 
-                 */
-                op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
 
-                /* 
-                 * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
-                 * own target. 
-                 */
-                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
-                                      LCK_EX, MDS_INODELOCK_UPDATE, 
-                                      MF_MDC_CANCEL_FID2);
+        /*
+         * LOOKUP lock on src child (fid3) should also be cancelled for
+         * src_tgt in mdc_rename.
+         */
+        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
 
-                /* 
-                 * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
-                 */
-                if (rc == 0) {
-                        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
-                                              LCK_EX, MDS_INODELOCK_LOOKUP,
-                                              MF_MDC_CANCEL_FID4);
-                }
+        /*
+         * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
+         * own target.
+         */
+        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+                              LCK_EX, MDS_INODELOCK_UPDATE,
+                              MF_MDC_CANCEL_FID2);
 
-                /* 
-                 * Cancel all the locks on tgt child (fid4). 
-                 */
-                if (rc == 0)
-                        rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, 
-                                              LCK_EX, MDS_INODELOCK_FULL,
-                                              MF_MDC_CANCEL_FID4);
+        /*
+         * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
+         */
+        if (rc == 0) {
+                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+                                      LCK_EX, MDS_INODELOCK_LOOKUP,
+                                      MF_MDC_CANCEL_FID4);
         }
 
+        /*
+         * Cancel all the locks on tgt child (fid4).
+         */
+        if (rc == 0)
+                rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+                                      LCK_EX, MDS_INODELOCK_FULL,
+                                      MF_MDC_CANCEL_FID4);
+
         if (rc == 0)
                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
                                new, newlen, request);
+
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
-                DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
+                DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
                           "Got -ERESTART during rename!\n");
                 ptlrpc_req_finished(*request);
                 *request = NULL;
@@ -2205,7 +2209,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
 
 /**
  * Main purpose of LMV blocking ast is to remove split directory LMV
- * presentation object (struct lmv_object) attached to the lock being revoked. 
+ * presentation object (struct lmv_object) attached to the lock being revoked.
  */
 int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                      void *data, int flag)
@@ -2225,7 +2229,7 @@ int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 }
                 break;
         case LDLM_CB_CANCELING:
-                /* 
+                /*
                  * Time to drop cached attrs for split directory object
                  */
                 obj = lock->l_ast_data;
@@ -2353,7 +2357,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 hash_adj += rank * seg_size;
 
                 CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
-                       LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj, 
+                       LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
                        offset, tgt0_idx, offset + hash_adj, tgt_idx);
 
                 offset = (offset + hash_adj) & MAX_HASH_SIZE;
@@ -2412,6 +2416,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
         struct lmv_tgt_desc     *tgt = NULL;
         struct lmv_object       *obj;
         int                      rc;
+        int                      sidx;
         int                      loop = 0;
         ENTRY;
 
@@ -2422,28 +2427,24 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
 repeat:
         ++loop;
         LASSERT(loop <= 2);
-        if (op_data->op_namelen != 0) {
-                int sidx;
+        LASSERT(op_data->op_namelen != 0);
 
-                obj = lmv_object_find(obd, &op_data->op_fid1);
-                if (obj) {
-                        sidx = raw_name2idx(obj->lo_hashtype,
-                                            obj->lo_objcount,
-                                            op_data->op_name,
-                                            op_data->op_namelen);
-                        op_data->op_bias &= ~MDS_CHECK_SPLIT;
-                        op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
-                        tgt = lmv_get_target(lmv,
-                                             obj->lo_stripes[sidx].ls_mds);
-                        lmv_object_put(obj);
-                        CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n",
-                               op_data->op_namelen, op_data->op_name,
-                               PFID(&op_data->op_fid1), sidx);
-                }
-        } else {
-                CDEBUG(D_INODE, "Drop i_nlink on "DFID"\n",
-                       PFID(&op_data->op_fid1));
+        obj = lmv_object_find(obd, &op_data->op_fid1);
+        if (obj) {
+                sidx = raw_name2idx(obj->lo_hashtype,
+                                    obj->lo_objcount,
+                                    op_data->op_name,
+                                    op_data->op_namelen);
+                op_data->op_bias &= ~MDS_CHECK_SPLIT;
+                op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
+                tgt = lmv_get_target(lmv,
+                                     obj->lo_stripes[sidx].ls_mds);
+                lmv_object_put(obj);
+                CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n",
+                       op_data->op_namelen, op_data->op_name,
+                       PFID(&op_data->op_fid1), sidx);
         }
+
         if (tgt == NULL) {
                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
                 if (IS_ERR(tgt))
@@ -2455,25 +2456,24 @@ repeat:
         op_data->op_fsgid = current->fsgid;
         op_data->op_cap = cfs_curproc_cap_pack();
 
-        /* 
+        /*
          * If child's fid is given, cancel unused locks for it if it is from
-         * another export than parent. 
+         * another export than parent.
+         *
+         * LOOKUP lock for child (fid3) should also be cancelled on parent
+         * tgt_tgt in mdc_unlink().
          */
-        if (op_data->op_namelen) {
-                /*
-                 * LOOKUP lock for child (fid3) should also be cancelled on 
-                 * parent tgt_tgt in mdc_unlink(). 
-                 */
-                op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+        op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+        /*
+         * Cancel FULL locks on child (fid3).
+         */
+        rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+                              MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
 
-                /* 
-                 * Cancel FULL locks on child (fid3). 
-                 */
-                rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
-                                      MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
-        }
         if (rc == 0)
                 rc = md_unlink(tgt->ltd_exp, op_data, request);
+
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -2513,7 +2513,7 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 }
 
 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
-                        void *key, __u32 *vallen, void *val, 
+                        void *key, __u32 *vallen, void *val,
                         struct lov_stripe_md *lsm)
 {
         struct obd_device       *obd;
@@ -2541,8 +2541,8 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
                      i++, tgts++) {
 
-                        /* 
-                         * All tgts should be connected when this gets called. 
+                        /*
+                         * All tgts should be connected when this gets called.
                          */
                         if (!tgts || !tgts->ltd_exp) {
                                 CERROR("target not setup?\n");
@@ -2559,9 +2559,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 if (rc)
                         RETURN(rc);
 
-                /* 
+                /*
                  * Forwarding this request to first MDS, it should know LOV
-                 * desc. 
+                 * desc.
                  */
                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
                                   vallen, val, NULL);
@@ -2702,8 +2702,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         {
                 magic = le32_to_cpu(mea->mea_magic);
         } else {
-                /* 
-                 * Old mea is not handled here. 
+                /*
+                 * Old mea is not handled here.
                  */
                 CERROR("Old not supportable EA is found\n");
                 LBUG();
@@ -2721,7 +2721,7 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
 }
 
 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
-                             ldlm_policy_data_t *policy, ldlm_mode_t mode, 
+                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
                              int flags, void *opaque)
 {
         struct obd_device       *obd = exp->exp_obd;
@@ -2745,13 +2745,15 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
-int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
+int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
+                      __u32 *bits)
 {
         struct obd_device       *obd = exp->exp_obd;
         struct lmv_obd          *lmv = &obd->u.lmv;
         int                      rc;
         ENTRY;
-        rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data);
+
+        rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data, bits);
         RETURN(rc);
 }
 
@@ -2768,11 +2770,11 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
 
         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
 
-        /* 
+        /*
          * With CMD every object can have two locks in different namespaces:
          * lookup lock in space of mds storing direntry and update/open lock in
          * space of mds storing inode. Thus we check all targets, not only that
-         * one fid was created in. 
+         * one fid was created in.
          */
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
@@ -2882,6 +2884,18 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         RETURN(rc);
 }
 
+int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
+                    const struct req_msg_field *field, struct obd_capa **oc)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        int rc;
+
+        ENTRY;
+        rc = md_unpack_capa(lmv->tgts[0].ltd_exp, req, field, oc);
+        RETURN(rc);
+}
+
 int lmv_intent_getattr_async(struct obd_export *exp,
                              struct md_enqueue_info *minfo,
                              struct ldlm_enqueue_info *einfo)
@@ -2907,10 +2921,10 @@ int lmv_intent_getattr_async(struct obd_export *exp,
                                             (char *)op_data->op_name,
                                             op_data->op_namelen);
                         op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
-                        tgt = lmv_get_target(lmv, 
+                        tgt = lmv_get_target(lmv,
                                              obj->lo_stripes[sidx].ls_mds);
                         CDEBUG(D_INODE,
-                               "Choose slave dir ("DFID") -> mds #%d\n", 
+                               "Choose slave dir ("DFID") -> mds #%d\n",
                                PFID(&op_data->op_fid1), tgt->ltd_idx);
                 } else {
                         tgt = lmv_find_target(lmv, &op_data->op_fid1);
@@ -2928,7 +2942,7 @@ int lmv_intent_getattr_async(struct obd_export *exp,
                 if (minfo->mi_it.it_op & IT_LOOKUP)
                         minfo->mi_it.it_op = IT_GETATTR;
         }
-        
+
         if (IS_ERR(tgt))
                 RETURN(PTR_ERR(tgt));
 
@@ -3005,11 +3019,15 @@ struct md_ops lmv_md_ops = {
         .m_set_open_replay_data = lmv_set_open_replay_data,
         .m_clear_open_replay_data = lmv_clear_open_replay_data,
         .m_renew_capa           = lmv_renew_capa,
+        .m_unpack_capa          = lmv_unpack_capa,
         .m_get_remote_perm      = lmv_get_remote_perm,
         .m_intent_getattr_async = lmv_intent_getattr_async,
         .m_revalidate_lock      = lmv_revalidate_lock
 };
 
+static quota_interface_t *quota_interface;
+extern quota_interface_t lmv_quota_interface;
+
 int __init lmv_init(void)
 {
         struct lprocfs_static_vars lvars;
@@ -3024,10 +3042,18 @@ int __init lmv_init(void)
         }
 
         lprocfs_lmv_init_vars(&lvars);
+
+        request_module("lquota");
+        quota_interface = PORTAL_SYMBOL_GET(lmv_quota_interface);
+        init_obd_quota_ops(quota_interface, &lmv_obd_ops);
+
         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
-        if (rc)
+        if (rc) {
+                if (quota_interface)
+                        PORTAL_SYMBOL_PUT(lmv_quota_interface);
                 cfs_mem_cache_destroy(lmv_object_cache);
+        }
 
         return rc;
 }
@@ -3035,6 +3061,9 @@ int __init lmv_init(void)
 #ifdef __KERNEL__
 static void lmv_exit(void)
 {
+        if (quota_interface)
+                PORTAL_SYMBOL_PUT(lmv_quota_interface);
+
         class_unregister_type(LUSTRE_LMV_NAME);
 
         LASSERTF(atomic_read(&lmv_object_count) == 0,