*/
#define DEBUG_SUBSYSTEM S_CLASS
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#include <libcfs/libcfs.h>
struct lu_site *site;
struct lu_object *orig;
cfs_hash_bd_t bd;
+ const struct lu_fid *fid;
top = o->lo_header;
site = o->lo_dev->ld_site;
orig = o;
+ /*
+ * till we have full fids-on-OST implemented anonymous objects
+ * are possible in OSP. such an object isn't listed in the site
+ * so we should not remove it from the site.
+ */
+ fid = lu_object_fid(o);
+ if (fid_is_zero(fid)) {
+ LASSERT(top->loh_hash.next == NULL
+ && top->loh_hash.pprev == NULL);
+ LASSERT(cfs_list_empty(&top->loh_lru));
+ if (!cfs_atomic_dec_and_test(&top->loh_ref))
+ return;
+ cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
+ if (o->lo_ops->loo_object_release != NULL)
+ o->lo_ops->loo_object_release(env, o);
+ }
+ lu_object_free(env, orig);
+ return;
+ }
+
cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
EXPORT_SYMBOL(lu_object_put);
/**
+ * Put object and don't keep in cache. This is temporary solution for
+ * multi-site objects when its layering is not constant.
+ */
+void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
+{
+ cfs_set_bit(LU_OBJECT_HEARD_BANSHEE,
+ &o->lo_header->loh_flags);
+ return lu_object_put(env, o);
+}
+EXPORT_SYMBOL(lu_object_put_nocache);
+
+/**
* Allocate new object.
*
* This follows object creation protocol, described in the comment within
.hs_put_locked = lu_obj_hop_put_locked,
};
+void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
+{
+ cfs_spin_lock(&s->ls_ld_lock);
+ if (cfs_list_empty(&d->ld_linkage))
+ cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage);
+ cfs_spin_unlock(&s->ls_ld_lock);
+}
+EXPORT_SYMBOL(lu_dev_add_linkage);
+
+void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
+{
+ cfs_spin_lock(&s->ls_ld_lock);
+ cfs_list_del_init(&d->ld_linkage);
+ cfs_spin_unlock(&s->ls_ld_lock);
+}
+EXPORT_SYMBOL(lu_dev_del_linkage);
+
/**
* Initialize site \a s, with \a d as the top level device.
*/
CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
cfs_spin_lock_init(&s->ls_ld_lock);
- cfs_spin_lock(&s->ls_ld_lock);
- cfs_list_add(&top->ld_linkage, &s->ls_ld_linkage);
- cfs_spin_unlock(&s->ls_ld_lock);
+ lu_dev_add_linkage(s, top);
RETURN(0);
}
/* purge again. */
lu_site_purge(env, site, ~0);
- if (!cfs_hash_is_empty(site->ls_obj_hash)) {
- /*
- * Uh-oh, objects still exist.
- */
- LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-
- lu_site_print(env, site, &msgdata, lu_cdebug_printer);
- }
-
for (scan = top; scan != NULL; scan = next) {
const struct lu_device_type *ldt = scan->ld_type;
struct obd_type *type;
/**
* Maximal number of tld slots.
*/
- LU_CONTEXT_KEY_NR = 32
+ LU_CONTEXT_KEY_NR = 40
};
static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
-static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(lu_keys_guard);
/**
* Global counter incremented whenever key is registered, unregistered,
key->lct_fini(ctx, key, ctx->lc_value[index]);
lu_ref_del(&key->lct_reference, "ctx", ctx);
cfs_atomic_dec(&key->lct_used);
- LASSERT(key->lct_owner != NULL);
- if (!(ctx->lc_tags & LCT_NOREF)) {
- LASSERT(cfs_module_refcount(key->lct_owner) > 0);
- cfs_module_put(key->lct_owner);
- }
- ctx->lc_value[index] = NULL;
- }
+
+ LASSERT(key->lct_owner != NULL);
+ if ((ctx->lc_tags & LCT_NOREF) == 0) {
+ LINVRNT(cfs_module_refcount(key->lct_owner) > 0);
+ cfs_module_put(key->lct_owner);
+ }
+ ctx->lc_value[index] = NULL;
+ }
}
/**
static void keys_fini(struct lu_context *ctx)
{
- int i;
+ int i;
- cfs_spin_lock(&lu_keys_guard);
- if (ctx->lc_value != NULL) {
- for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
- key_fini(ctx, i);
- OBD_FREE(ctx->lc_value,
- ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
- ctx->lc_value = NULL;
- }
- cfs_spin_unlock(&lu_keys_guard);
+ if (ctx->lc_value == NULL)
+ return;
+
+ for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
+ key_fini(ctx, i);
+
+ OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+ ctx->lc_value = NULL;
}
static int keys_fill(struct lu_context *ctx)
static int keys_init(struct lu_context *ctx)
{
- int result;
+ OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+ if (likely(ctx->lc_value != NULL))
+ return keys_fill(ctx);
- OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
- if (likely(ctx->lc_value != NULL))
- result = keys_fill(ctx);
- else
- result = -ENOMEM;
-
- if (result != 0)
- keys_fini(ctx);
- return result;
+ return -ENOMEM;
}
/**
*/
int lu_context_init(struct lu_context *ctx, __u32 tags)
{
+ int rc;
+
memset(ctx, 0, sizeof *ctx);
ctx->lc_state = LCS_INITIALIZED;
ctx->lc_tags = tags;
cfs_spin_lock(&lu_keys_guard);
cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
cfs_spin_unlock(&lu_keys_guard);
- } else
- CFS_INIT_LIST_HEAD(&ctx->lc_remember);
- return keys_init(ctx);
+ } else {
+ CFS_INIT_LIST_HEAD(&ctx->lc_remember);
+ }
+
+ rc = keys_init(ctx);
+ if (rc != 0)
+ lu_context_fini(ctx);
+
+ return rc;
}
EXPORT_SYMBOL(lu_context_init);
*/
void lu_context_fini(struct lu_context *ctx)
{
- LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
- ctx->lc_state = LCS_FINALIZED;
- keys_fini(ctx);
- cfs_spin_lock(&lu_keys_guard);
- cfs_list_del_init(&ctx->lc_remember);
- cfs_spin_unlock(&lu_keys_guard);
+ LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
+ ctx->lc_state = LCS_FINALIZED;
+
+ if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
+ LASSERT(cfs_list_empty(&ctx->lc_remember));
+ keys_fini(ctx);
+
+ } else { /* could race with key degister */
+ cfs_spin_lock(&lu_keys_guard);
+ keys_fini(ctx);
+ cfs_list_del_init(&ctx->lc_remember);
+ cfs_spin_unlock(&lu_keys_guard);
+ }
}
EXPORT_SYMBOL(lu_context_fini);
#ifdef __KERNEL__
+/*
+ * There exists a potential lock inversion deadlock scenario when using
+ * Lustre on top of ZFS. This occurs between one of ZFS's
+ * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
+ * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
+ * while thread B will take the ht_lock and sleep on the lu_sites_guard
+ * lock. Obviously neither thread will wake and drop their respective hold
+ * on their lock.
+ *
+ * To prevent this from happening we must ensure the lu_sites_guard lock is
+ * not taken while down this code path. ZFS reliably does not set the
+ * __GFP_FS bit in its code paths, so this can be used to determine if it
+ * is safe to take the lu_sites_guard lock.
+ *
+ * Ideally we should accurately return the remaining number of cached
+ * objects without taking the lu_sites_guard lock, but this is not
+ * possible in the current implementation.
+ */
static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
{
lu_site_stats_t stats;
int remain = shrink_param(sc, nr_to_scan);
CFS_LIST_HEAD(splice);
- if (remain != 0) {
- if (!(shrink_param(sc, gfp_mask) & __GFP_FS))
+ if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) {
+ if (remain != 0)
return -1;
- CDEBUG(D_INODE, "Shrink %d objects\n", remain);
+ else
+ /* We must not take the lu_sites_guard lock when
+ * __GFP_FS is *not* set because of the deadlock
+ * possibility detailed above. Additionally,
+ * since we cannot determine the number of
+ * objects in the cache without taking this
+ * lock, we're in a particularly tough spot. As
+ * a result, we'll just lie and say our cache is
+ * empty. This _should_ be ok, as we can't
+ * reclaim objects when __GFP_FS is *not* set
+ * anyways.
+ */
+ return 0;
}
+ CDEBUG(D_INODE, "Shrink %d objects\n", remain);
+
cfs_mutex_lock(&lu_sites_guard);
cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
if (shrink_param(sc, nr_to_scan) != 0) {
}
}
EXPORT_SYMBOL(lu_kmem_fini);
+
+/**
+ * Temporary solution to be able to assign fid in ->do_create()
+ * till we have fully-functional OST fids
+ */
+void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
+ const struct lu_fid *fid)
+{
+ struct lu_site *s = o->lo_dev->ld_site;
+ struct lu_fid *old = &o->lo_header->loh_fid;
+ struct lu_site_bkt_data *bkt;
+ struct lu_object *shadow;
+ cfs_waitlink_t waiter;
+ cfs_hash_t *hs;
+ cfs_hash_bd_t bd;
+ __u64 version = 0;
+
+ LASSERT(fid_is_zero(old));
+
+ hs = s->ls_obj_hash;
+ cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
+ shadow = htable_lookup(s, &bd, fid, &waiter, &version);
+ /* supposed to be unique */
+ LASSERT(shadow == NULL);
+ *old = *fid;
+ bkt = cfs_hash_bd_extra_get(hs, &bd);
+ cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+ bkt->lsb_busy++;
+ cfs_hash_bd_unlock(hs, &bd, 1);
+}
+EXPORT_SYMBOL(lu_object_assign_fid);
+
+/**
+ * allocates object with 0 (non-assiged) fid
+ * XXX: temporary solution to be able to assign fid in ->do_create()
+ * till we have fully-functional OST fids
+ */
+struct lu_object *lu_object_anon(const struct lu_env *env,
+ struct lu_device *dev,
+ const struct lu_object_conf *conf)
+{
+ struct lu_fid fid;
+ struct lu_object *o;
+
+ fid_zero(&fid);
+ o = lu_object_alloc(env, dev, &fid, conf);
+
+ return o;
+}
+EXPORT_SYMBOL(lu_object_anon);