if ((attr->la_valid & LA_ATIME &&
attr->la_atime < la->la_atime) ||
(attr->la_valid & LA_CTIME &&
- attr->la_atime < la->la_ctime) ||
+ attr->la_ctime < la->la_ctime) ||
(attr->la_valid & LA_MTIME &&
- attr->la_atime < la->la_mtime))
+ attr->la_mtime < la->la_mtime))
setattr_time = true;
if (!setattr_time)
struct lmv_mds_md_v1 *lmm1;
int stripe_count;
int lmm_size;
+ int type = LU_SEQ_RANGE_ANY;
int i;
int rc;
__u32 mdtidx;
lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
- &mdtidx, LU_SEQ_RANGE_MDT);
+ &mdtidx, &type);
if (rc != 0)
RETURN(rc);
for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
struct lod_tgt_desc *tgt;
int idx;
+ int type = LU_SEQ_RANGE_ANY;
struct dt_object *dto;
fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
- rc = lod_fld_lookup(env, lod, fid,
- &idx, LU_SEQ_RANGE_MDT);
+ rc = lod_fld_lookup(env, lod, fid, &idx, &type);
if (rc != 0)
GOTO(out, rc);
/* destroy all underlying objects */
for (i = 0; i < lo->ldo_stripenr; i++) {
LASSERT(lo->ldo_stripe[i]);
- rc = dt_destroy(env, lo->ldo_stripe[i], th);
- if (rc)
- break;
+ /* for striped directory, next == ldo_stripe[0] */
+ if (next != lo->ldo_stripe[i]) {
+ rc = dt_destroy(env, lo->ldo_stripe[i], th);
+ if (rc)
+ break;
+ }
}
RETURN(rc);
return dt_object_sync(env, dt_object_child(dt));
}
+struct lod_slave_locks {
+ int lsl_lock_count;
+ struct lustre_handle lsl_handle[0];
+};
+
+static int lod_object_unlock_internal(const struct lu_env *env,
+ struct dt_object *dt,
+ struct ldlm_enqueue_info *einfo,
+ ldlm_policy_data_t *policy)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ if (slave_locks == NULL)
+ RETURN(0);
+
+ for (i = 0; i < slave_locks->lsl_lock_count; i++) {
+ if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
+ int rc1;
+
+ einfo->ei_cbdata = &slave_locks->lsl_handle[i];
+ rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
+ policy);
+ if (rc1 < 0)
+ rc = rc == 0 ? rc1 : rc;
+ }
+ }
+
+ RETURN(rc);
+}
+
+static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
+ struct ldlm_enqueue_info *einfo,
+ union ldlm_policy_data *policy)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
+ int slave_locks_size;
+ int rc;
+ ENTRY;
+
+ if (slave_locks == NULL)
+ RETURN(0);
+
+ rc = lod_load_striping(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* Note: for remote lock for single stripe dir, MDT will cancel
+ * the lock by lockh directly */
+ if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt)))
+ RETURN(0);
+
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ RETURN(-ENOTDIR);
+
+ /* Only cancel slave lock for striped dir */
+ rc = lod_object_unlock_internal(env, dt, einfo, policy);
+
+ slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
+ sizeof(slave_locks->lsl_handle[0]);
+ OBD_FREE(slave_locks, slave_locks_size);
+ einfo->ei_cbdata = NULL;
+
+ RETURN(rc);
+}
+
static int lod_object_lock(const struct lu_env *env,
- struct dt_object *dt, struct lustre_handle *lh,
+ struct dt_object *dt,
+ struct lustre_handle *lh,
struct ldlm_enqueue_info *einfo,
- void *policy)
+ union ldlm_policy_data *policy)
{
- struct dt_object *next = dt_object_child(dt);
- int rc;
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc = 0;
+ int i;
+ int slave_locks_size;
+ struct lod_slave_locks *slave_locks = NULL;
ENTRY;
- /*
- * declare setattr on the local object
- */
- rc = dt_object_lock(env, next, lh, einfo, policy);
+ /* remote object lock */
+ if (!einfo->ei_enq_slave) {
+ LASSERT(dt_object_remote(dt));
+ return dt_object_lock(env, dt_object_child(dt), lh, einfo,
+ policy);
+ }
+
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ RETURN(-ENOTDIR);
+
+ rc = lod_load_striping(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* No stripes */
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
+ sizeof(slave_locks->lsl_handle[0]);
+ /* Freed in lod_object_unlock */
+ OBD_ALLOC(slave_locks, slave_locks_size);
+ if (slave_locks == NULL)
+ RETURN(-ENOMEM);
+ slave_locks->lsl_lock_count = lo->ldo_stripenr;
+
+ /* striped directory lock */
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ struct lustre_handle lockh;
+
+ LASSERT(lo->ldo_stripe[i]);
+ rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
+ policy);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ slave_locks->lsl_handle[i] = lockh;
+ }
+
+ einfo->ei_cbdata = slave_locks;
+
+out:
+ if (rc != 0 && slave_locks != NULL) {
+ einfo->ei_cbdata = slave_locks;
+ lod_object_unlock_internal(env, dt, einfo, policy);
+ OBD_FREE(slave_locks, slave_locks_size);
+ einfo->ei_cbdata = NULL;
+ }
RETURN(rc);
}
.do_capa_get = lod_capa_get,
.do_object_sync = lod_object_sync,
.do_object_lock = lod_object_lock,
+ .do_object_unlock = lod_object_unlock,
};
static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,