Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index 333104d..b48e161 100644 (file)
@@ -1379,6 +1379,7 @@ repeat:
         CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen,
                op_data->op_name, PFID(&op_data->op_fid1));
 
+        op_data->op_flags |= MF_MDC_CANCEL_FID1;
         rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
                        cap_effective, rdev, request);
         if (rc == 0) {
@@ -1709,6 +1710,97 @@ repeat:
         RETURN(rc);
 }
 
+#define md_op_data_fid(op_data, fl)                     \
+        (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
+         fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
+         fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
+         fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
+         NULL)
+
+/* @tgt_exp is the export the metadata request is sent.
+ * @fid_exp is the export the cancel should be sent for the current fid.
+ * if @fid_exp is NULL, the export is found for the current fid.
+ * @op_data keeps the current fid, which is pointed through @flag.
+ * @mode, @bits -- lock match parameters. */
+static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp,
+                            struct obd_export *fid_exp,
+                            struct md_op_data *op_data,
+                            ldlm_mode_t mode, int bits, int flag)
+{
+        struct lu_fid *fid = md_op_data_fid(op_data, flag);
+        ldlm_policy_data_t policy = {{0}};
+        int rc = 0;
+        ENTRY;
+
+        if (!fid_is_sane(fid))
+                RETURN(0);
+        
+        if (fid_exp == NULL)
+                fid_exp = lmv_find_export(lmv, fid);
+
+        if (tgt_exp == fid_exp) {
+                /* The export is the same as on the target server, cancel 
+                 * will be sent along with the main metadata operation. */
+                op_data->op_flags |= flag;
+                RETURN(0);
+        }
+
+        policy.l_inodebits.bits = bits;
+        rc = md_cancel_unused(fid_exp, fid, &policy, mode, LDLM_FL_ASYNC, NULL);
+        RETURN(rc);
+}
+
+#ifdef EARLY_CANCEL_FOR_STRIPED_DIR_IS_READY
+/* Check if the fid in @op_data pointed to by flag is of the same export(s)
+ * as @tgt_exp. Early cancels will be sent later by mdc code, otherwise, call
+ * md_cancel_unused for child export(s). */
+static int lmv_early_cancel_stripes(struct obd_export *exp,
+                                    struct obd_export *tgt_exp,
+                                    struct md_op_data *op_data,
+                                    ldlm_mode_t mode, int bits, int flag)
+{
+        struct lu_fid *fid = md_op_data_fid(op_data, flag);
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *st_exp;
+        struct lmv_obj *obj;
+        int rc = 0;
+        ENTRY;
+
+        if (!fid_is_sane(fid))
+                RETURN(0);
+
+        obj = lmv_obj_grab(obd, fid);
+        if (obj) {
+                ldlm_policy_data_t policy = {{0}};
+                struct lu_fid *st_fid;
+                int i;
+                
+                policy.l_inodebits.bits = bits;
+                for (i = 0; i < obj->lo_objcount; i++) {
+                        st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
+                        st_fid = &obj->lo_inodes[i].li_fid;
+                        if (tgt_exp != st_exp) {
+                                rc = md_cancel_unused(st_exp, st_fid, &policy,
+                                                      mode, 0, NULL);
+                                if (rc)
+                                        break;
+                        } else {
+                                /* Some export matches to @tgt_exp, do cancel
+                                 * for its fid in mdc */
+                                *fid = *st_fid;
+                                op_data->op_flags |= flag;
+                        }
+                }
+                lmv_obj_put(obj);
+        } else {
+                rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data,
+                                      mode, bits, flag);
+        }
+        RETURN(rc);
+}
+#endif
+
 /*
  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
  * op_data->op_fid2
@@ -1718,6 +1810,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
         struct lmv_obj *obj;
         int rc, loop = 0;
         mdsno_t mds;
@@ -1769,7 +1862,15 @@ repeat:
         op_data->op_fsgid = current->fsgid;
         op_data->op_cap   = current->cap_effective;
 
-        rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
+        tgt_exp = lmv->tgts[mds].ltd_exp;
+        if (op_data->op_namelen) {
+                op_data->op_flags |= MF_MDC_CANCEL_FID2;
+                /* Cancel UPDATE lock on child (fid1). */
+                rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
+                                      MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+        }
+        if (rc == 0)
+                rc = md_link(tgt_exp, op_data, request);
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -1793,11 +1894,12 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                       const char *old, int oldlen, const char *new, int newlen,
                       struct ptlrpc_request **request)
 {
+        struct obd_export *tgt_exp = NULL, *src_exp;
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         int rc, mea_idx, loop = 0;
         struct lmv_obj *obj;
-        mdsno_t mds;
+        mdsno_t mds1, mds2;
         ENTRY;
 
         CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
@@ -1818,7 +1920,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                        "to "DFID"\n", newlen, new, oldlen, newlen,
                        PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
 
-                rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
+                rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds1);
                 if (rc)
                         RETURN(rc);
 
@@ -1851,11 +1953,11 @@ repeat:
                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                        (char *)old, oldlen);
                 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
-                mds = obj->lo_inodes[mea_idx].li_mds;
+                mds1 = obj->lo_inodes[mea_idx].li_mds;
                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
                 lmv_obj_put(obj);
         } else {
-                rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
+                rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1);
                 if (rc)
                         RETURN(rc);
         }
@@ -1869,9 +1971,14 @@ repeat:
                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                        (char *)new, newlen);
 
+                mds2 = obj->lo_inodes[mea_idx].li_mds;
                 op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
                 lmv_obj_put(obj);
+        } else {
+                rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2);
+                if (rc)
+                        RETURN(rc);
         }
 
 request:
@@ -1879,8 +1986,36 @@ request:
         op_data->op_fsgid = current->fsgid;
         op_data->op_cap   = current->cap_effective;
 
-        rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen,
-                       new, newlen, request);
+        src_exp = lmv_get_export(lmv, mds1);
+        tgt_exp = lmv_get_export(lmv, mds2);
+        if (oldlen) {
+                /* LOOKUP lock on src child (fid3) should also be cancelled for
+                 * src_exp in mdc_rename. */
+                op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+                /* Cancel UPDATE locks on tgt parent (fid2), tgt_exp is its
+                 * own export. */
+                rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, LCK_EX,
+                                      MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+
+                /* Cancel LOOKUP locks on tgt child (fid4) for parent tgt_exp.*/
+                if (rc == 0)
+                        rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data,
+                                              LCK_EX, MDS_INODELOCK_LOOKUP,
+                                              MF_MDC_CANCEL_FID4);
+
+                /* XXX: the case when child is a striped dir is not supported.
+                 * Only the master stripe has all locks cancelled early. */
+                /* Cancel all the locks on tgt child (fid4). */
+                if (rc == 0)
+                        rc = lmv_early_cancel(lmv, src_exp, NULL, op_data,
+                                              LCK_EX, MDS_INODELOCK_FULL,
+                                              MF_MDC_CANCEL_FID4);
+        }
+
+        if (rc == 0)
+                rc = md_rename(src_exp, op_data, old, oldlen,
+                               new, newlen, request);
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -1921,6 +2056,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
                obj ? ", split" : "");
 
+        op_data->op_flags |= MF_MDC_CANCEL_FID1;
         if (obj) {
                 for (i = 0; i < obj->lo_objcount; i++) {
                         op_data->op_fid1 = obj->lo_inodes[i].li_fid;
@@ -2224,6 +2360,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct obd_export *tgt_exp = NULL;
+        struct lmv_obj *obj;
         int rc, loop = 0;
         ENTRY;
 
@@ -2241,7 +2378,6 @@ repeat:
         ++loop;
         LASSERT(loop <= 2);
         if (op_data->op_namelen != 0) {
-                struct lmv_obj *obj;
                 int mea_idx;
 
                 obj = lmv_obj_grab(obd, &op_data->op_fid1);
@@ -2274,7 +2410,21 @@ repeat:
         op_data->op_fsgid = current->fsgid;
         op_data->op_cap   = current->cap_effective;
 
-        rc = md_unlink(tgt_exp, op_data, request);
+        /* If child's fid is given, cancel unused locks for it if it is from
+         * another export than parent. */
+        if (op_data->op_namelen) {
+                /* LOOKUP lock for child (fid3) should also be cancelled on 
+                 * parent tgt_exp in mdc_unlink(). */
+                op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+                /* XXX: the case when child is a striped dir is not supported.
+                 * Only the master stripe has all locks cancelled early. */
+                /* Cancel FULL locks on child (fid3). */
+                rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
+                                      MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+        }
+        if (rc == 0)
+                rc = md_unlink(tgt_exp, op_data, request);
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -2539,7 +2689,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
 
 static int lmv_cancel_unused(struct obd_export *exp,
                              const struct lu_fid *fid,
-                             int flags, void *opaque)
+                             ldlm_policy_data_t *policy,
+                             ldlm_mode_t mode, int flags, void *opaque)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -2552,8 +2703,8 @@ static int lmv_cancel_unused(struct obd_export *exp,
                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
                         continue;
 
-                err = md_cancel_unused(lmv->tgts[i].ltd_exp,
-                                       fid, flags, opaque);
+                err = md_cancel_unused(lmv->tgts[i].ltd_exp, fid,
+                                       policy, mode, flags, opaque);
                 if (!rc)
                         rc = err;
         }