* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* lustre/lod/lod_object.c
* local OSD object interface to the MDD layer, and abstracts the
* addressing of local (OSD) and remote (OSP) objects. The API is
* described in the file lustre/include/dt_object.h and in
- * lustre/doc/osd-api.txt.
+ * Documentation/osd-api.txt.
*
* Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
*/
#include <obd.h>
#include <obd_class.h>
-#include <lustre_ver.h>
#include <obd_support.h>
-#include <lprocfs_status.h>
#include <lustre_fid.h>
-#include <lustre_param.h>
-#include <lustre_fid.h>
+#include <lustre_linkea.h>
#include <lustre_lmv.h>
+#include <lustre_param.h>
+#include <lustre_swab.h>
+#include <lustre_ver.h>
+#include <lprocfs_status.h>
#include <md_object.h>
-#include <lustre_linkea.h>
#include "lod_internal.h"
stripe_count = le32_to_cpu(lum->lum_stripe_count);
- /* shrink the stripe_count to the avaible MDT count */
- if (stripe_count > lod->lod_remote_mdt_count + 1 &&
- !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
- stripe_count = lod->lod_remote_mdt_count + 1;
-
OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
if (stripe == NULL)
RETURN(-ENOMEM);
RETURN(rc);
}
-
/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
rc = lod_verify_md_striping(d, lum1);
if (rc == 0 &&
le32_to_cpu(lum1->lum_stripe_count) > 1) {
- /* Directory will be striped only if
- * stripe_count > 1 */
lc->ldo_stripenr =
le32_to_cpu(lum1->lum_stripe_count);
lc->ldo_dir_stripe_offset =
lc->ldo_dir_stripe_offset = -1;
}
+ /* shrink the stripe_count to the avaible MDT count */
+ if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 &&
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+ lc->ldo_stripenr = d->lod_remote_mdt_count + 1;
+
+ /* Directory will be striped only if stripe_count > 1, if
+ * stripe_count == 1, let's reset stripenr = 0 to avoid
+ * create single master stripe and also help to unify the
+ * stripe handling of directories and files */
+ if (lc->ldo_stripenr == 1)
+ lc->ldo_stripenr = 0;
+
CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
LASSERT(lo->ldo_stripe_size > 0);
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
rc = dt_attr_get(env, next, attr);
LASSERT(attr->la_valid & LA_SIZE);
if (rc)
GOTO(out, rc = -EREMOTE);
} else if (lo->ldo_dir_stripe_offset !=
ss->ss_node_id) {
- GOTO(out, rc = -EREMOTE);
+ struct lod_device *lod;
+ struct lod_tgt_descs *ltd;
+ struct lod_tgt_desc *tgt = NULL;
+ bool found_mdt = false;
+ int i;
+
+ lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ ltd = &lod->lod_mdt_descs;
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ tgt = LTD_TGT(ltd, i);
+ if (tgt->ltd_index ==
+ lo->ldo_dir_stripe_offset) {
+ found_mdt = true;
+ break;
+ }
+ }
+
+ /* If the MDT indicated by stripe_offset can be
+ * found, then tell client to resend the create
+ * request to the correct MDT, otherwise return
+ * error to client */
+ if (found_mdt)
+ GOTO(out, rc = -EREMOTE);
+ else
+ GOTO(out, rc = -EINVAL);
}
}
return dt_object_sync(env, dt_object_child(dt), start, end);
}
-struct lod_slave_locks {
- int lsl_lock_count;
- struct lustre_handle lsl_handle[0];
-};
-
/**
* Release LDLM locks on the stripes of a striped directory.
*
static int lod_object_unlock_internal(const struct lu_env *env,
struct dt_object *dt,
struct ldlm_enqueue_info *einfo,
- ldlm_policy_data_t *policy)
+ union ldlm_policy_data *policy)
{
- struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
+ struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
int rc = 0;
int i;
ENTRY;
if (slave_locks == NULL)
RETURN(0);
- for (i = 1; i < slave_locks->lsl_lock_count; i++) {
- if (lustre_handle_is_used(&slave_locks->lsl_handle[i]))
- ldlm_lock_decref(&slave_locks->lsl_handle[i],
+ for (i = 1; i < slave_locks->count; i++) {
+ if (lustre_handle_is_used(&slave_locks->handles[i]))
+ ldlm_lock_decref(&slave_locks->handles[i],
einfo->ei_mode);
}
struct ldlm_enqueue_info *einfo,
union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
- int slave_locks_size;
- int rc;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+ int slave_locks_size;
+ int i;
ENTRY;
if (slave_locks == NULL)
RETURN(0);
- if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
- RETURN(-ENOTDIR);
-
+ LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
+ LASSERT(lo->ldo_stripenr > 1);
/* Note: for remote lock for single stripe dir, MDT will cancel
* the lock by lockh directly */
- if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
- RETURN(0);
+ LASSERT(!dt_object_remote(dt_object_child(dt)));
- /* Only cancel slave lock for striped dir */
- rc = lod_object_unlock_internal(env, dt, einfo, policy);
+ /* locks were unlocked in MDT layer */
+ for (i = 1; i < slave_locks->count; i++)
+ LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
- slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
- sizeof(slave_locks->lsl_handle[0]);
+ slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
+ sizeof(slave_locks->handles[0]);
OBD_FREE(slave_locks, slave_locks_size);
einfo->ei_cbdata = NULL;
- RETURN(rc);
+ RETURN(0);
}
/**
int rc = 0;
int i;
int slave_locks_size;
- struct lod_slave_locks *slave_locks = NULL;
+ struct lustre_handle_array *slave_locks = NULL;
ENTRY;
/* remote object lock */
RETURN(0);
slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
- sizeof(slave_locks->lsl_handle[0]);
+ sizeof(slave_locks->handles[0]);
/* Freed in lod_object_unlock */
OBD_ALLOC(slave_locks, slave_locks_size);
if (slave_locks == NULL)
RETURN(-ENOMEM);
- slave_locks->lsl_lock_count = lo->ldo_stripenr;
+ slave_locks->count = lo->ldo_stripenr;
/* striped directory lock */
for (i = 1; i < lo->ldo_stripenr; i++) {
ldlm_completion_callback completion = einfo->ei_cb_cp;
__u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ if (einfo->ei_mode == LCK_PW ||
+ einfo->ei_mode == LCK_EX)
+ dlmflags |= LDLM_FL_COS_INCOMPAT;
+
/* This only happens if there are mulitple stripes
* on the master MDT, i.e. except stripe0, there are
* other stripes on the Master MDT as well, Only
}
if (rc != 0)
GOTO(out, rc);
- slave_locks->lsl_handle[i] = lockh;
+ slave_locks->handles[i] = lockh;
}
einfo->ei_cbdata = slave_locks;