struct lu_site *site;
struct lu_object *orig;
cfs_hash_bd_t bd;
+ const struct lu_fid *fid;
top = o->lo_header;
site = o->lo_dev->ld_site;
orig = o;
+ /*
+ * till we have full fids-on-OST implemented anonymous objects
+ * are possible in OSP. such an object isn't listed in the site
+ * so we should not remove it from the site.
+ */
+ fid = lu_object_fid(o);
+ if (fid_is_zero(fid)) {
+ LASSERT(top->loh_hash.next == NULL
+ && top->loh_hash.pprev == NULL);
+ LASSERT(cfs_list_empty(&top->loh_lru));
+ if (!cfs_atomic_dec_and_test(&top->loh_ref))
+ return;
+ cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
+ if (o->lo_ops->loo_object_release != NULL)
+ o->lo_ops->loo_object_release(env, o);
+ }
+ lu_object_free(env, orig);
+ return;
+ }
+
cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
EXPORT_SYMBOL(lu_object_put);
/**
+ * Put object and don't keep in cache. This is temporary solution for
+ * multi-site objects when its layering is not constant.
+ */
+void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
+{
+ set_bit(LU_OBJECT_HEARD_BANSHEE,
+ &o->lo_header->loh_flags);
+ return lu_object_put(env, o);
+}
+EXPORT_SYMBOL(lu_object_put_nocache);
+
+/**
* Allocate new object.
*
* This follows object creation protocol, described in the comment within
* Global list of all sites on this node
*/
static CFS_LIST_HEAD(lu_sites);
-static CFS_DEFINE_MUTEX(lu_sites_guard);
+static DEFINE_MUTEX(lu_sites_guard);
/**
* Global environment used by site shrinker.
void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
{
- cfs_spin_lock(&s->ls_ld_lock);
+ spin_lock(&s->ls_ld_lock);
if (cfs_list_empty(&d->ld_linkage))
cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage);
- cfs_spin_unlock(&s->ls_ld_lock);
+ spin_unlock(&s->ls_ld_lock);
}
EXPORT_SYMBOL(lu_dev_add_linkage);
void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
{
- cfs_spin_lock(&s->ls_ld_lock);
+ spin_lock(&s->ls_ld_lock);
cfs_list_del_init(&d->ld_linkage);
- cfs_spin_unlock(&s->ls_ld_lock);
+ spin_unlock(&s->ls_ld_lock);
}
EXPORT_SYMBOL(lu_dev_del_linkage);
lu_ref_add(&top->ld_reference, "site-top", s);
CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
- cfs_spin_lock_init(&s->ls_ld_lock);
+ spin_lock_init(&s->ls_ld_lock);
lu_dev_add_linkage(s, top);
- RETURN(0);
+ RETURN(0);
}
EXPORT_SYMBOL(lu_site_init);
*/
void lu_site_fini(struct lu_site *s)
{
- cfs_mutex_lock(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
cfs_list_del_init(&s->ls_linkage);
- cfs_mutex_unlock(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
if (s->ls_obj_hash != NULL) {
cfs_hash_putref(s->ls_obj_hash);
int lu_site_init_finish(struct lu_site *s)
{
int result;
- cfs_mutex_lock(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
result = lu_context_refill(&lu_shrink_env.le_ctx);
if (result == 0)
cfs_list_add(&s->ls_linkage, &lu_sites);
- cfs_mutex_unlock(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
return result;
}
EXPORT_SYMBOL(lu_site_init_finish);
/* purge again. */
lu_site_purge(env, site, ~0);
- if (!cfs_hash_is_empty(site->ls_obj_hash)) {
- /*
- * Uh-oh, objects still exist.
- */
- LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-
- lu_site_print(env, site, &msgdata, lu_cdebug_printer);
- }
-
for (scan = top; scan != NULL; scan = next) {
const struct lu_device_type *ldt = scan->ld_type;
struct obd_type *type;
/**
* Maximal number of tld slots.
*/
- LU_CONTEXT_KEY_NR = 32
+ LU_CONTEXT_KEY_NR = 40
};
static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
-static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(lu_keys_guard);
/**
* Global counter incremented whenever key is registered, unregistered,
LASSERT(key->lct_owner != NULL);
result = -ENFILE;
- cfs_spin_lock(&lu_keys_guard);
+ spin_lock(&lu_keys_guard);
for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
if (lu_keys[i] == NULL) {
key->lct_index = i;
break;
}
}
- cfs_spin_unlock(&lu_keys_guard);
- return result;
+ spin_unlock(&lu_keys_guard);
+ return result;
}
EXPORT_SYMBOL(lu_context_key_register);
*/
void lu_context_key_degister(struct lu_context_key *key)
{
- LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
- LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
+ LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
+ LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
- lu_context_key_quiesce(key);
+ lu_context_key_quiesce(key);
- ++key_set_version;
- cfs_spin_lock(&lu_keys_guard);
- key_fini(&lu_shrink_env.le_ctx, key->lct_index);
- if (lu_keys[key->lct_index]) {
- lu_keys[key->lct_index] = NULL;
- lu_ref_fini(&key->lct_reference);
- }
- cfs_spin_unlock(&lu_keys_guard);
+ ++key_set_version;
+ spin_lock(&lu_keys_guard);
+ key_fini(&lu_shrink_env.le_ctx, key->lct_index);
+ if (lu_keys[key->lct_index]) {
+ lu_keys[key->lct_index] = NULL;
+ lu_ref_fini(&key->lct_reference);
+ }
+ spin_unlock(&lu_keys_guard);
- LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
- "key has instances: %d\n",
- cfs_atomic_read(&key->lct_used));
+ LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
+ "key has instances: %d\n",
+ cfs_atomic_read(&key->lct_used));
}
EXPORT_SYMBOL(lu_context_key_degister);
/*
* XXX memory barrier has to go here.
*/
- cfs_spin_lock(&lu_keys_guard);
- cfs_list_for_each_entry(ctx, &lu_context_remembered,
- lc_remember)
- key_fini(ctx, key->lct_index);
- cfs_spin_unlock(&lu_keys_guard);
- ++key_set_version;
- }
+ spin_lock(&lu_keys_guard);
+ cfs_list_for_each_entry(ctx, &lu_context_remembered,
+ lc_remember)
+ key_fini(ctx, key->lct_index);
+ spin_unlock(&lu_keys_guard);
+ ++key_set_version;
+ }
}
EXPORT_SYMBOL(lu_context_key_quiesce);
{
int rc;
- memset(ctx, 0, sizeof *ctx);
- ctx->lc_state = LCS_INITIALIZED;
- ctx->lc_tags = tags;
- if (tags & LCT_REMEMBER) {
- cfs_spin_lock(&lu_keys_guard);
- cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
- cfs_spin_unlock(&lu_keys_guard);
+ memset(ctx, 0, sizeof *ctx);
+ ctx->lc_state = LCS_INITIALIZED;
+ ctx->lc_tags = tags;
+ if (tags & LCT_REMEMBER) {
+ spin_lock(&lu_keys_guard);
+ cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
+ spin_unlock(&lu_keys_guard);
} else {
CFS_INIT_LIST_HEAD(&ctx->lc_remember);
}
keys_fini(ctx);
} else { /* could race with key degister */
- cfs_spin_lock(&lu_keys_guard);
+ spin_lock(&lu_keys_guard);
keys_fini(ctx);
cfs_list_del_init(&ctx->lc_remember);
- cfs_spin_unlock(&lu_keys_guard);
+ spin_unlock(&lu_keys_guard);
}
}
EXPORT_SYMBOL(lu_context_fini);
void lu_context_tags_update(__u32 tags)
{
- cfs_spin_lock(&lu_keys_guard);
- lu_context_tags_default |= tags;
- key_set_version ++;
- cfs_spin_unlock(&lu_keys_guard);
+ spin_lock(&lu_keys_guard);
+ lu_context_tags_default |= tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
}
EXPORT_SYMBOL(lu_context_tags_update);
void lu_context_tags_clear(__u32 tags)
{
- cfs_spin_lock(&lu_keys_guard);
- lu_context_tags_default &= ~tags;
- key_set_version ++;
- cfs_spin_unlock(&lu_keys_guard);
+ spin_lock(&lu_keys_guard);
+ lu_context_tags_default &= ~tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
}
EXPORT_SYMBOL(lu_context_tags_clear);
void lu_session_tags_update(__u32 tags)
{
- cfs_spin_lock(&lu_keys_guard);
- lu_session_tags_default |= tags;
- key_set_version ++;
- cfs_spin_unlock(&lu_keys_guard);
+ spin_lock(&lu_keys_guard);
+ lu_session_tags_default |= tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
}
EXPORT_SYMBOL(lu_session_tags_update);
void lu_session_tags_clear(__u32 tags)
{
- cfs_spin_lock(&lu_keys_guard);
- lu_session_tags_default &= ~tags;
- key_set_version ++;
- cfs_spin_unlock(&lu_keys_guard);
+ spin_lock(&lu_keys_guard);
+ lu_session_tags_default &= ~tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
}
EXPORT_SYMBOL(lu_session_tags_clear);
#ifdef __KERNEL__
+/*
+ * There exists a potential lock inversion deadlock scenario when using
+ * Lustre on top of ZFS. This occurs between one of ZFS's
+ * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
+ * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
+ * while thread B will take the ht_lock and sleep on the lu_sites_guard
+ * lock. Obviously neither thread will wake and drop their respective hold
+ * on their lock.
+ *
+ * To prevent this from happening we must ensure the lu_sites_guard lock is
+ * not taken while down this code path. ZFS reliably does not set the
+ * __GFP_FS bit in its code paths, so this can be used to determine if it
+ * is safe to take the lu_sites_guard lock.
+ *
+ * Ideally we should accurately return the remaining number of cached
+ * objects without taking the lu_sites_guard lock, but this is not
+ * possible in the current implementation.
+ */
static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
{
lu_site_stats_t stats;
int remain = shrink_param(sc, nr_to_scan);
CFS_LIST_HEAD(splice);
- if (remain != 0) {
- if (!(shrink_param(sc, gfp_mask) & __GFP_FS))
+ if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) {
+ if (remain != 0)
return -1;
- CDEBUG(D_INODE, "Shrink %d objects\n", remain);
+ else
+ /* We must not take the lu_sites_guard lock when
+ * __GFP_FS is *not* set because of the deadlock
+ * possibility detailed above. Additionally,
+ * since we cannot determine the number of
+ * objects in the cache without taking this
+ * lock, we're in a particularly tough spot. As
+ * a result, we'll just lie and say our cache is
+ * empty. This _should_ be ok, as we can't
+ * reclaim objects when __GFP_FS is *not* set
+ * anyways.
+ */
+ return 0;
}
- cfs_mutex_lock(&lu_sites_guard);
+ CDEBUG(D_INODE, "Shrink %d objects\n", remain);
+
+ mutex_lock(&lu_sites_guard);
cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
if (shrink_param(sc, nr_to_scan) != 0) {
remain = lu_site_purge(&lu_shrink_env, s, remain);
break;
}
cfs_list_splice(&splice, lu_sites.prev);
- cfs_mutex_unlock(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
cached = (cached / 100) * sysctl_vfs_cache_pressure;
if (shrink_param(sc, nr_to_scan) == 0)
int llo_global_init(void);
void llo_global_fini(void);
+/* context key constructor/destructor: lu_ucred_key_init, lu_ucred_key_fini */
+LU_KEY_INIT_FINI(lu_ucred, struct lu_ucred);
+
+static struct lu_context_key lu_ucred_key = {
+ .lct_tags = LCT_SESSION,
+ .lct_init = lu_ucred_key_init,
+ .lct_fini = lu_ucred_key_fini
+};
+
+/**
+ * Get ucred key if session exists and ucred key is allocated on it.
+ * Return NULL otherwise.
+ */
+struct lu_ucred *lu_ucred(const struct lu_env *env)
+{
+ if (!env->le_ses)
+ return NULL;
+ return lu_context_key_get(env->le_ses, &lu_ucred_key);
+}
+EXPORT_SYMBOL(lu_ucred);
+
+/**
+ * Get ucred key and check if it is properly initialized.
+ * Return NULL otherwise.
+ */
+struct lu_ucred *lu_ucred_check(const struct lu_env *env)
+{
+ struct lu_ucred *uc = lu_ucred(env);
+ if (uc && uc->uc_valid != UCRED_OLD && uc->uc_valid != UCRED_NEW)
+ return NULL;
+ return uc;
+}
+EXPORT_SYMBOL(lu_ucred_check);
+
+/**
+ * Get ucred key, which must exist and must be properly initialized.
+ * Assert otherwise.
+ */
+struct lu_ucred *lu_ucred_assert(const struct lu_env *env)
+{
+ struct lu_ucred *uc = lu_ucred_check(env);
+ LASSERT(uc != NULL);
+ return uc;
+}
+EXPORT_SYMBOL(lu_ucred_assert);
+
/**
* Initialization of global lu_* data.
*/
result = lu_context_key_register(&lu_global_key);
if (result != 0)
return result;
+
+ LU_CONTEXT_KEY_INIT(&lu_ucred_key);
+ result = lu_context_key_register(&lu_ucred_key);
+ if (result != 0)
+ return result;
+
/*
* At this level, we don't know what tags are needed, so allocate them
* conservatively. This should not be too bad, because this
* environment is global.
*/
- cfs_mutex_lock(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
- cfs_mutex_unlock(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
if (result != 0)
return result;
}
lu_context_key_degister(&lu_global_key);
+ lu_context_key_degister(&lu_ucred_key);
/*
* Tear shrinker environment down _after_ de-registering
* lu_global_key, because the latter has a value in the former.
*/
- cfs_mutex_lock(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
lu_env_fini(&lu_shrink_env);
- cfs_mutex_unlock(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
lu_ref_global_fini();
}
}
}
EXPORT_SYMBOL(lu_kmem_fini);
+
+/**
+ * Temporary solution to be able to assign fid in ->do_create()
+ * till we have fully-functional OST fids
+ */
+void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
+ const struct lu_fid *fid)
+{
+ struct lu_site *s = o->lo_dev->ld_site;
+ struct lu_fid *old = &o->lo_header->loh_fid;
+ struct lu_site_bkt_data *bkt;
+ struct lu_object *shadow;
+ cfs_waitlink_t waiter;
+ cfs_hash_t *hs;
+ cfs_hash_bd_t bd;
+ __u64 version = 0;
+
+ LASSERT(fid_is_zero(old));
+
+ hs = s->ls_obj_hash;
+ cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
+ shadow = htable_lookup(s, &bd, fid, &waiter, &version);
+ /* supposed to be unique */
+ LASSERT(shadow == NULL);
+ *old = *fid;
+ bkt = cfs_hash_bd_extra_get(hs, &bd);
+ cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+ bkt->lsb_busy++;
+ cfs_hash_bd_unlock(hs, &bd, 1);
+}
+EXPORT_SYMBOL(lu_object_assign_fid);
+
+/**
+ * allocates object with 0 (non-assiged) fid
+ * XXX: temporary solution to be able to assign fid in ->do_create()
+ * till we have fully-functional OST fids
+ */
+struct lu_object *lu_object_anon(const struct lu_env *env,
+ struct lu_device *dev,
+ const struct lu_object_conf *conf)
+{
+ struct lu_fid fid;
+ struct lu_object *o;
+
+ fid_zero(&fid);
+ o = lu_object_alloc(env, dev, &fid, conf);
+
+ return o;
+}
+EXPORT_SYMBOL(lu_object_anon);