Whamcloud - gitweb
LU-3531 mdt: delete striped directory
[fs/lustre-release.git] / lustre / lod / lod_object.c
index 5f69061..90f2da6 100644 (file)
@@ -409,9 +409,9 @@ static int lod_attr_set(const struct lu_env *env,
                if ((attr->la_valid & LA_ATIME &&
                     attr->la_atime < la->la_atime) ||
                    (attr->la_valid & LA_CTIME &&
-                    attr->la_atime < la->la_ctime) ||
+                    attr->la_ctime < la->la_ctime) ||
                    (attr->la_valid & LA_MTIME &&
-                    attr->la_atime < la->la_mtime))
+                    attr->la_mtime < la->la_mtime))
                        setattr_time = true;
 
                if (!setattr_time)
@@ -517,6 +517,7 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        struct lmv_mds_md_v1    *lmm1;
        int                     stripe_count;
        int                     lmm_size;
+       int                     type = LU_SEQ_RANGE_ANY;
        int                     i;
        int                     rc;
        __u32                   mdtidx;
@@ -537,7 +538,7 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
        lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
        rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
-                           &mdtidx, LU_SEQ_RANGE_MDT);
+                           &mdtidx, &type);
        if (rc != 0)
                RETURN(rc);
 
@@ -596,11 +597,11 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
        for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
                struct lod_tgt_desc     *tgt;
                int                     idx;
+               int                     type = LU_SEQ_RANGE_ANY;
                struct dt_object        *dto;
 
                fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
-               rc = lod_fld_lookup(env, lod, fid,
-                                   &idx, LU_SEQ_RANGE_MDT);
+               rc = lod_fld_lookup(env, lod, fid, &idx, &type);
                if (rc != 0)
                        GOTO(out, rc);
 
@@ -1942,9 +1943,12 @@ static int lod_object_destroy(const struct lu_env *env,
        /* destroy all underlying objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_destroy(env, lo->ldo_stripe[i], th);
-               if (rc)
-                       break;
+               /* for striped directory, next == ldo_stripe[0] */
+               if (next != lo->ldo_stripe[i]) {
+                       rc = dt_destroy(env, lo->ldo_stripe[i], th);
+                       if (rc)
+                               break;
+               }
        }
 
        RETURN(rc);
@@ -2003,19 +2007,137 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
        return dt_object_sync(env, dt_object_child(dt));
 }
 
+struct lod_slave_locks {
+       int                     lsl_lock_count;
+       struct lustre_handle    lsl_handle[0];
+};
+
+static int lod_object_unlock_internal(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct ldlm_enqueue_info *einfo,
+                                     ldlm_policy_data_t *policy)
+{
+       struct lod_object       *lo = lod_dt_obj(dt);
+       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
+       int                     rc = 0;
+       int                     i;
+       ENTRY;
+
+       if (slave_locks == NULL)
+               RETURN(0);
+
+       for (i = 0; i < slave_locks->lsl_lock_count; i++) {
+               if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
+                       int     rc1;
+
+                       einfo->ei_cbdata = &slave_locks->lsl_handle[i];
+                       rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
+                                              policy);
+                       if (rc1 < 0)
+                               rc = rc == 0 ? rc1 : rc;
+               }
+       }
+
+       RETURN(rc);
+}
+
+static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
+                            struct ldlm_enqueue_info *einfo,
+                            union ldlm_policy_data *policy)
+{
+       struct lod_object       *lo = lod_dt_obj(dt);
+       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
+       int                     slave_locks_size;
+       int                     rc;
+       ENTRY;
+
+       if (slave_locks == NULL)
+               RETURN(0);
+
+       rc = lod_load_striping(env, lo);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* Note: for remote lock for single stripe dir, MDT will cancel
+        * the lock by lockh directly */
+       if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt)))
+               RETURN(0);
+
+       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+               RETURN(-ENOTDIR);
+
+       /* Only cancel slave lock for striped dir */
+       rc = lod_object_unlock_internal(env, dt, einfo, policy);
+
+       slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
+                          sizeof(slave_locks->lsl_handle[0]);
+       OBD_FREE(slave_locks, slave_locks_size);
+       einfo->ei_cbdata = NULL;
+
+       RETURN(rc);
+}
+
 static int lod_object_lock(const struct lu_env *env,
-                          struct dt_object *dt, struct lustre_handle *lh,
+                          struct dt_object *dt,
+                          struct lustre_handle *lh,
                           struct ldlm_enqueue_info *einfo,
-                          void *policy)
+                          union ldlm_policy_data *policy)
 {
-       struct dt_object   *next = dt_object_child(dt);
-       int              rc;
+       struct lod_object       *lo = lod_dt_obj(dt);
+       int                     rc = 0;
+       int                     i;
+       int                     slave_locks_size;
+       struct lod_slave_locks  *slave_locks = NULL;
        ENTRY;
 
-       /*
-        * declare setattr on the local object
-        */
-       rc = dt_object_lock(env, next, lh, einfo, policy);
+       /* remote object lock */
+       if (!einfo->ei_enq_slave) {
+               LASSERT(dt_object_remote(dt));
+               return dt_object_lock(env, dt_object_child(dt), lh, einfo,
+                                     policy);
+       }
+
+       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+               RETURN(-ENOTDIR);
+
+       rc = lod_load_striping(env, lo);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* No stripes */
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
+       slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
+                          sizeof(slave_locks->lsl_handle[0]);
+       /* Freed in lod_object_unlock */
+       OBD_ALLOC(slave_locks, slave_locks_size);
+       if (slave_locks == NULL)
+               RETURN(-ENOMEM);
+       slave_locks->lsl_lock_count = lo->ldo_stripenr;
+
+       /* striped directory lock */
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               struct lustre_handle    lockh;
+
+               LASSERT(lo->ldo_stripe[i]);
+               rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
+                                   policy);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               slave_locks->lsl_handle[i] = lockh;
+       }
+
+       einfo->ei_cbdata = slave_locks;
+
+out:
+       if (rc != 0 && slave_locks != NULL) {
+               einfo->ei_cbdata = slave_locks;
+               lod_object_unlock_internal(env, dt, einfo, policy);
+               OBD_FREE(slave_locks, slave_locks_size);
+               einfo->ei_cbdata = NULL;
+       }
 
        RETURN(rc);
 }
@@ -2048,6 +2170,7 @@ struct dt_object_operations lod_obj_ops = {
        .do_capa_get            = lod_capa_get,
        .do_object_sync         = lod_object_sync,
        .do_object_lock         = lod_object_lock,
+       .do_object_unlock       = lod_object_unlock,
 };
 
 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,