* and resends for avoid deadlocks */
#define OBD_STATFS_FROM_CACHE 0x0002 /* the statfs callback should not update
* obd_osfs_age */
-#define OBD_STATFS_PTLRPCD 0x0004 /* requests will be sent via ptlrpcd
- * instead of a specific set. This
- * means that we cannot rely on the set
- * interpret routine to be called.
- * lov_statfs_fini() must thus be called
- * by the request interpret routine */
-#define OBD_STATFS_FOR_MDT0 0x0008 /* The statfs is only for retrieving
+#define OBD_STATFS_FOR_MDT0 0x0004 /* The statfs is only for retrieving
* information from MDT0. */
/* OBD Device Declarations */
lov_pool.o \
lov_request.o \
lovsub_dev.o \
- lovsub_io.o \
lovsub_lock.o \
lovsub_object.o \
lovsub_page.o \
* Upper half.
*/
-/**
- * Resources that are used in memory-cleaning path, and whose allocation
- * cannot fail even when memory is tight. They are preallocated in sufficient
- * quantities in lov_device::ld_emerg[], and access to them is serialized
- * lov_device::ld_mutex.
- */
-struct lov_device_emerg {
- /**
- * Page list used to submit IO when memory is in pressure.
- */
- struct cl_page_list emrg_page_list;
- /**
- * sub-io's shared by all threads accessing this device when memory is
- * too low to allocate sub-io's dynamically.
- */
- struct cl_io emrg_subio;
- /**
- * Environments used by sub-io's in
- * lov_device_emerg::emrg_subio.
- */
- struct lu_env *emrg_env;
- /**
- * Refchecks for lov_device_emerg::emrg_env.
- *
- * \see cl_env_get()
- */
- __u16 emrg_refcheck;
-};
-
struct lov_device {
/*
* XXX Locking of lov-private data is missing.
__u32 ld_target_nr;
struct lovsub_device **ld_target;
__u32 ld_flags;
-
- /** Emergency resources used in memory-cleansing paths. */
- struct lov_device_emerg **ld_emrg;
- /**
- * Serializes access to lov_device::ld_emrg in low-memory
- * conditions.
- */
- struct mutex ld_mutex;
};
/**
struct lovsub_device {
struct cl_device acid_cl;
- struct lov_device *acid_super;
- int acid_idx;
struct cl_device *acid_next;
};
};
/**
- * A link between a top-lock and a sub-lock. Separate data-structure is
- * necessary, because top-locks and sub-locks are in M:N relationship.
- *
- * \todo This can be optimized for a (by far) most frequent case of a single
- * top-lock per sub-lock.
- */
-struct lov_lock_link {
- struct lov_lock *lll_super;
- /** An index within parent lock. */
- int lll_idx;
- /**
- * A linkage into per sub-lock list of all corresponding top-locks,
- * hanging off lovsub_lock::lss_parents.
- */
- struct list_head lll_list;
-};
-
-/**
* Lock state at lovsub layer.
*/
struct lovsub_lock {
struct cl_lock_slice lss_cl;
- /**
- * List of top-locks that have given sub-lock as their part. Protected
- * by cl_lock::cll_guard mutex.
- */
- struct list_head lss_parents;
- /**
- * Top-lock that initiated current operation on this sub-lock. This is
- * only set during top-to-bottom lock operations like enqueue, and is
- * used to optimize state change notification. Protected by
- * cl_lock::cll_guard mutex.
- *
- * \see lovsub_lock_state_one().
- */
- struct cl_lock *lss_active;
};
/**
struct lov_sublock_env {
const struct lu_env *lse_env;
struct cl_io *lse_io;
- struct lov_io_sub *lse_sub;
};
struct lovsub_page {
struct lov_thread_info {
struct cl_object_conf lti_stripe_conf;
struct lu_fid lti_fid;
- struct cl_lock_descr lti_ldescr;
struct ost_lvb lti_lvb;
struct cl_2queue lti_cl2q;
struct cl_page_list lti_plist;
wait_queue_t lti_waiter;
- struct cl_attr lti_attr;
};
/**
* \see cl_env_get()
*/
__u16 sub_refcheck;
- __u16 sub_reenter;
/**
* true, iff cl_io_init() was successfully executed against
* lov_io_sub::sub_io.
*/
loff_t lis_endpos;
- int lis_mem_frozen;
int lis_stripe_count;
int lis_active_subios;
extern struct kmem_cache *lovsub_lock_kmem;
extern struct kmem_cache *lovsub_object_kmem;
-extern struct kmem_cache *lov_lock_link_kmem;
-
int lov_object_init (const struct lu_env *env, struct lu_object *obj,
const struct lu_object_conf *conf);
int lovsub_object_init (const struct lu_env *env, struct lu_object *obj,
struct cl_io *io);
int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io);
-void lov_lock_unlink (const struct lu_env *env, struct lov_lock_link *link,
- struct lovsub_lock *sub);
struct lov_io_sub *lov_sub_get(const struct lu_env *env, struct lov_io *lio,
int stripe);
-void lov_sub_put (struct lov_io_sub *sub);
-int lov_sublock_modify (const struct lu_env *env, struct lov_lock *lov,
- struct lovsub_lock *sublock,
- const struct cl_lock_descr *d, int idx);
-
int lov_page_init (const struct lu_env *env, struct cl_object *ob,
struct cl_page *page, pgoff_t index);
const struct lu_object_header *hdr,
struct lu_device *dev);
-struct lov_lock_link *lov_lock_link_find(const struct lu_env *env,
- struct lov_lock *lck,
- struct lovsub_lock *sub);
-struct lov_io_sub *lov_page_subio (const struct lu_env *env,
- struct lov_io *lio,
- const struct cl_page_slice *slice);
-
struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
int lov_page_stripe(const struct cl_page *page);
struct kmem_cache *lovsub_lock_kmem;
struct kmem_cache *lovsub_object_kmem;
-struct kmem_cache *lov_lock_link_kmem;
-
-/** Lock class of lov_device::ld_mutex. */
-static struct lock_class_key cl_lov_device_mutex_class;
-
struct lu_kmem_descr lov_caches[] = {
{
.ckd_cache = &lov_lock_kmem,
.ckd_size = sizeof (struct lovsub_object)
},
{
- .ckd_cache = &lov_lock_link_kmem,
- .ckd_name = "lov_lock_link_kmem",
- .ckd_size = sizeof (struct lov_lock_link)
- },
- {
.ckd_cache = NULL
}
};
break;
}
lsd = cl2lovsub_dev(cl);
- lsd->acid_idx = i;
- lsd->acid_super = ld;
ld->ld_target[i] = lsd;
}
RETURN(rc);
}
-static void lov_emerg_free(struct lov_device_emerg **emrg, int nr)
-{
- int i;
-
- for (i = 0; i < nr; ++i) {
- struct lov_device_emerg *em;
-
- em = emrg[i];
- if (em != NULL) {
- LASSERT(em->emrg_page_list.pl_nr == 0);
- if (em->emrg_env != NULL)
- cl_env_put(em->emrg_env, &em->emrg_refcheck);
- OBD_FREE_PTR(em);
- }
- }
- OBD_FREE(emrg, nr * sizeof emrg[0]);
-}
-
+/* Free the lov specific data created for the back end lu_device. */
static struct lu_device *lov_device_free(const struct lu_env *env,
struct lu_device *d)
{
- struct lov_device *ld = lu2lov_dev(d);
- const int nr = ld->ld_target_nr;
-
- cl_device_fini(lu2cl_dev(d));
- if (ld->ld_target != NULL)
- OBD_FREE(ld->ld_target, nr * sizeof ld->ld_target[0]);
- if (ld->ld_emrg != NULL)
- lov_emerg_free(ld->ld_emrg, nr);
- OBD_FREE_PTR(ld);
- return NULL;
+ struct lov_device *ld = lu2lov_dev(d);
+ const int nr = ld->ld_target_nr;
+
+ cl_device_fini(lu2cl_dev(d));
+ if (ld->ld_target != NULL)
+ OBD_FREE(ld->ld_target, nr * sizeof ld->ld_target[0]);
+
+ OBD_FREE_PTR(ld);
+ return NULL;
}
static void lov_cl_del_target(const struct lu_env *env, struct lu_device *dev,
EXIT;
}
-static struct lov_device_emerg **lov_emerg_alloc(int nr)
-{
- struct lov_device_emerg **emerg;
- int i;
- int result;
-
- OBD_ALLOC(emerg, nr * sizeof emerg[0]);
- if (emerg == NULL)
- return ERR_PTR(-ENOMEM);
- for (result = i = 0; i < nr && result == 0; i++) {
- struct lov_device_emerg *em;
-
- OBD_ALLOC_PTR(em);
- if (em != NULL) {
- emerg[i] = em;
- cl_page_list_init(&em->emrg_page_list);
- em->emrg_env = cl_env_alloc(&em->emrg_refcheck,
- LCT_REMEMBER|LCT_NOREF);
- if (!IS_ERR(em->emrg_env))
- em->emrg_env->le_ctx.lc_cookie = 0x2;
- else {
- result = PTR_ERR(em->emrg_env);
- em->emrg_env = NULL;
- }
- } else
- result = -ENOMEM;
- }
- if (result != 0) {
- lov_emerg_free(emerg, nr);
- emerg = ERR_PTR(result);
- }
- return emerg;
-}
-
static int lov_expand_targets(const struct lu_env *env, struct lov_device *dev)
{
- int result;
- __u32 tgt_size;
- __u32 sub_size;
-
- ENTRY;
- result = 0;
- tgt_size = dev->ld_lov->lov_tgt_size;
- sub_size = dev->ld_target_nr;
- if (sub_size < tgt_size) {
- struct lovsub_device **newd;
- struct lov_device_emerg **emerg;
- const size_t sz = sizeof newd[0];
-
- emerg = lov_emerg_alloc(tgt_size);
- if (IS_ERR(emerg))
- RETURN(PTR_ERR(emerg));
-
- OBD_ALLOC(newd, tgt_size * sz);
- if (newd != NULL) {
- mutex_lock(&dev->ld_mutex);
- if (sub_size > 0) {
- memcpy(newd, dev->ld_target, sub_size * sz);
- OBD_FREE(dev->ld_target, sub_size * sz);
- }
- dev->ld_target = newd;
- dev->ld_target_nr = tgt_size;
-
- if (dev->ld_emrg != NULL)
- lov_emerg_free(dev->ld_emrg, sub_size);
- dev->ld_emrg = emerg;
- mutex_unlock(&dev->ld_mutex);
- } else {
- lov_emerg_free(emerg, tgt_size);
- result = -ENOMEM;
- }
- }
- RETURN(result);
+ int result;
+ __u32 tgt_size;
+ __u32 sub_size;
+
+ ENTRY;
+ result = 0;
+ tgt_size = dev->ld_lov->lov_tgt_size;
+ sub_size = dev->ld_target_nr;
+ if (sub_size < tgt_size) {
+ struct lovsub_device **newd;
+ const size_t sz = sizeof(newd[0]);
+
+ OBD_ALLOC(newd, tgt_size * sz);
+ if (newd != NULL) {
+ if (sub_size > 0) {
+ memcpy(newd, dev->ld_target, sub_size * sz);
+ OBD_FREE(dev->ld_target, sub_size * sz);
+ }
+
+ dev->ld_target = newd;
+ dev->ld_target_nr = tgt_size;
+ } else {
+ result = -ENOMEM;
+ }
+ }
+
+ RETURN(result);
}
static int lov_cl_add_target(const struct lu_env *env, struct lu_device *dev,
if (rc == 0 && ld->ld_flags & LOV_DEV_INITIALIZED) {
LASSERT(dev->ld_site != NULL);
- cl = cl_type_setup(env, dev->ld_site, &lovsub_device_type,
- tgt->ltd_obd->obd_lu_dev);
- if (!IS_ERR(cl)) {
- lsd = cl2lovsub_dev(cl);
- lsd->acid_idx = index;
- lsd->acid_super = ld;
- ld->ld_target[index] = lsd;
- } else {
- CERROR("add failed (%d), deleting %s\n", rc,
- obd_uuid2str(&tgt->ltd_uuid));
- lov_cl_del_target(env, dev, index);
- rc = PTR_ERR(cl);
- }
+ cl = cl_type_setup(env, dev->ld_site, &lovsub_device_type,
+ tgt->ltd_obd->obd_lu_dev);
+ if (!IS_ERR(cl)) {
+ lsd = cl2lovsub_dev(cl);
+ ld->ld_target[index] = lsd;
+ } else {
+ CERROR("add failed (%d), deleting %s\n", rc,
+ obd_uuid2str(&tgt->ltd_uuid));
+ lov_cl_del_target(env, dev, index);
+ rc = PTR_ERR(cl);
+ }
}
obd_putref(obd);
RETURN(rc);
if (ld == NULL)
RETURN(ERR_PTR(-ENOMEM));
- cl_device_init(&ld->ld_cl, t);
- d = lov2lu_dev(ld);
- d->ld_ops = &lov_lu_ops;
-
- mutex_init(&ld->ld_mutex);
- lockdep_set_class(&ld->ld_mutex, &cl_lov_device_mutex_class);
+ cl_device_init(&ld->ld_cl, t);
+ d = lov2lu_dev(ld);
+ d->ld_ops = &lov_lu_ops;
/* setup the LOV OBD */
obd = class_name2obd(lustre_cfg_string(cfg, 0));
struct lov_request_set *rq_rqset;
struct list_head rq_link;
int rq_idx; /* index in lov->tgts array */
- int rq_stripe; /* stripe number */
- int rq_complete;
- int rq_rc;
};
struct lov_request_set {
struct obd_info *set_oi;
- atomic_t set_refcount;
- struct obd_export *set_exp;
- /* XXX: There is @set_exp already, however obd_statfs gets
- obd_device only. */
struct obd_device *set_obd;
int set_count;
atomic_t set_completes;
atomic_t set_success;
- atomic_t set_finish_checked;
struct list_head set_list;
- wait_queue_head_t set_waitq;
};
extern struct kmem_cache *lov_oinfo_slab;
extern struct lu_kmem_descr lov_caches[];
-void lov_finish_set(struct lov_request_set *set);
-
-static inline void lov_put_reqset(struct lov_request_set *set)
-{
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
-}
-
#define lov_uuid2str(lv, index) \
(char *)((lv)->lov_tgts[index]->ltd_uuid.uuid)
int stripe);
/* lov_request.c */
-void lov_set_add_req(struct lov_request *req, struct lov_request_set *set);
-int lov_set_finished(struct lov_request_set *set, int idempotent);
-void lov_update_set(struct lov_request_set *set,
- struct lov_request *req, int rc);
-int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx);
int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
struct lov_request_set **reqset);
-void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
- int success);
-int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
- int success);
int lov_fini_statfs_set(struct lov_request_set *set);
-int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
/* lov_obd.c */
void lov_stripe_lock(struct lov_stripe_md *md);
* @{
*/
-static inline void lov_sub_enter(struct lov_io_sub *sub)
-{
- sub->sub_reenter++;
-}
-static inline void lov_sub_exit(struct lov_io_sub *sub)
-{
- sub->sub_reenter--;
-}
-
static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
struct lov_io_sub *sub)
{
ENTRY;
if (sub->sub_io != NULL) {
- if (sub->sub_io_initialized) {
- lov_sub_enter(sub);
- cl_io_fini(sub->sub_env, sub->sub_io);
- lov_sub_exit(sub);
- sub->sub_io_initialized = 0;
- lio->lis_active_subios--;
- }
+ if (sub->sub_io_initialized) {
+ cl_io_fini(sub->sub_env, sub->sub_io);
+ sub->sub_io_initialized = 0;
+ lio->lis_active_subios--;
+ }
if (sub->sub_stripe == lio->lis_single_subio_index)
lio->lis_single_subio_index = -1;
else if (!sub->sub_borrowed)
static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
struct lov_io_sub *sub)
{
- struct lov_object *lov = lio->lis_object;
- struct lov_device *ld = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
- struct cl_io *sub_io;
- struct cl_object *sub_obj;
- struct cl_io *io = lio->lis_cl.cis_io;
-
- int stripe = sub->sub_stripe;
- int result;
+ struct lov_object *lov = lio->lis_object;
+ struct cl_io *sub_io;
+ struct cl_object *sub_obj;
+ struct cl_io *io = lio->lis_cl.cis_io;
+ int stripe = sub->sub_stripe;
+ int rc;
LASSERT(sub->sub_io == NULL);
LASSERT(sub->sub_env == NULL);
if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
RETURN(-EIO);
- result = 0;
sub->sub_io_initialized = 0;
sub->sub_borrowed = 0;
- if (lio->lis_mem_frozen) {
- LASSERT(mutex_is_locked(&ld->ld_mutex));
- sub->sub_io = &ld->ld_emrg[stripe]->emrg_subio;
- sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
- sub->sub_borrowed = 1;
- } else {
- sub->sub_env = cl_env_get(&sub->sub_refcheck);
- if (IS_ERR(sub->sub_env))
- result = PTR_ERR(sub->sub_env);
-
- if (result == 0) {
- /*
- * First sub-io. Use ->lis_single_subio to
- * avoid dynamic allocation.
- */
- if (lio->lis_active_subios == 0) {
- sub->sub_io = &lio->lis_single_subio;
- lio->lis_single_subio_index = stripe;
- } else {
- OBD_ALLOC_PTR(sub->sub_io);
- if (sub->sub_io == NULL)
- result = -ENOMEM;
- }
- }
- }
+ /* obtain new environment */
+ sub->sub_env = cl_env_get(&sub->sub_refcheck);
+ if (IS_ERR(sub->sub_env))
+ GOTO(fini_lov_io, rc = PTR_ERR(sub->sub_env));
- if (result == 0) {
- sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
- sub_io = sub->sub_io;
-
- sub_io->ci_obj = sub_obj;
- sub_io->ci_result = 0;
-
- sub_io->ci_parent = io;
- sub_io->ci_lockreq = io->ci_lockreq;
- sub_io->ci_type = io->ci_type;
- sub_io->ci_no_srvlock = io->ci_no_srvlock;
- sub_io->ci_noatime = io->ci_noatime;
-
- lov_sub_enter(sub);
- result = cl_io_sub_init(sub->sub_env, sub_io,
- io->ci_type, sub_obj);
- lov_sub_exit(sub);
- if (result >= 0) {
- lio->lis_active_subios++;
- sub->sub_io_initialized = 1;
- result = 0;
- }
- }
- if (result != 0)
- lov_io_sub_fini(env, lio, sub);
- RETURN(result);
+ /*
+ * First sub-io. Use ->lis_single_subio to
+ * avoid dynamic allocation.
+ */
+ if (lio->lis_active_subios == 0) {
+ sub->sub_io = &lio->lis_single_subio;
+ lio->lis_single_subio_index = stripe;
+ } else {
+ OBD_ALLOC_PTR(sub->sub_io);
+ if (sub->sub_io == NULL)
+ GOTO(fini_lov_io, rc = -ENOMEM);
+ }
+
+ sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
+ sub_io = sub->sub_io;
+
+ sub_io->ci_obj = sub_obj;
+ sub_io->ci_result = 0;
+ sub_io->ci_parent = io;
+ sub_io->ci_lockreq = io->ci_lockreq;
+ sub_io->ci_type = io->ci_type;
+ sub_io->ci_no_srvlock = io->ci_no_srvlock;
+ sub_io->ci_noatime = io->ci_noatime;
+
+ rc = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
+ if (rc >= 0) {
+ lio->lis_active_subios++;
+ sub->sub_io_initialized = 1;
+ rc = 0;
+ }
+fini_lov_io:
+ if (rc != 0)
+ lov_io_sub_fini(env, lio, sub);
+ RETURN(rc);
}
struct lov_io_sub *lov_sub_get(const struct lu_env *env,
rc = lov_io_sub_init(env, lio, sub);
} else
rc = 0;
- if (rc == 0)
- lov_sub_enter(sub);
- else
- sub = ERR_PTR(rc);
- RETURN(sub);
-}
-void lov_sub_put(struct lov_io_sub *sub)
-{
- lov_sub_exit(sub);
+ if (rc < 0)
+ sub = ERR_PTR(rc);
+
+ RETURN(sub);
}
/*****************************************************************************
RETURN(cl2lov_page(slice)->lps_stripe);
}
-struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
- const struct cl_page_slice *slice)
-{
- struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
- struct cl_page *page = slice->cpl_page;
- int stripe;
-
- LASSERT(lio->lis_cl.cis_io != NULL);
- LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
- LASSERT(lsm != NULL);
- LASSERT(lio->lis_nr_subios > 0);
- ENTRY;
-
- stripe = lov_page_stripe(page);
- RETURN(lov_sub_get(env, lio, stripe));
-}
-
-
static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
struct cl_io *io)
{
rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
if (rc != 0)
cl_io_iter_fini(sub->sub_env, sub->sub_io);
- lov_sub_put(sub);
if (rc != 0)
break;
struct lov_io_sub *sub;
int rc = 0;
- ENTRY;
+ ENTRY;
list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
- lov_sub_enter(sub);
- rc = iofunc(sub->sub_env, sub->sub_io);
- lov_sub_exit(sub);
- if (rc)
- break;
+ rc = iofunc(sub->sub_env, sub->sub_io);
+ if (rc)
+ break;
if (parent->ci_result == 0)
parent->ci_result = sub->sub_io->ci_result;
rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
ra);
- lov_sub_put(sub);
CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
if (lio->lis_active_subios == 1) {
int idx = lio->lis_single_subio_index;
- LASSERT(idx < lio->lis_nr_subios);
- sub = lov_sub_get(env, lio, idx);
- LASSERT(!IS_ERR(sub));
- LASSERT(sub->sub_io == &lio->lis_single_subio);
- rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+ LASSERT(idx < lio->lis_nr_subios);
+ sub = lov_sub_get(env, lio, idx);
+ LASSERT(!IS_ERR(sub));
+ LASSERT(sub->sub_io == &lio->lis_single_subio);
+ rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
crt, queue);
- lov_sub_put(sub);
- RETURN(rc);
- }
+ RETURN(rc);
+ }
LASSERT(lio->lis_subs != NULL);
if (!IS_ERR(sub)) {
rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
crt, cl2q);
- lov_sub_put(sub);
} else {
rc = PTR_ERR(sub);
}
LASSERT(sub->sub_io == &lio->lis_single_subio);
rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
from, to, cb);
- lov_sub_put(sub);
RETURN(rc);
}
if (!IS_ERR(sub)) {
rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
plist, from, stripe_to, cb);
- lov_sub_put(sub);
} else {
rc = PTR_ERR(sub);
break;
struct lov_io *lio;
struct lov_io_sub *sub;
- ENTRY;
- fio = &ios->cis_io->u.ci_fault;
- lio = cl2lov_io(env, ios);
- sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
- sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
- lov_sub_put(sub);
- RETURN(lov_io_start(env, ios));
+ ENTRY;
+ fio = &ios->cis_io->u.ci_fault;
+ lio = cl2lov_io(env, ios);
+ sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
+ sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
+ RETURN(lov_io_start(env, ios));
}
static void lov_io_fsync_end(const struct lu_env *env,
list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
struct cl_io *subio = sub->sub_io;
- lov_sub_enter(sub);
lov_io_end_wrapper(sub->sub_env, subio);
- lov_sub_exit(sub);
if (subio->ci_result == 0)
*written += subio->u.ci_fsync.fi_nr_written;
* they are not initialized at all. As a temp fix, in this case,
* we still borrow the parent's env to call sublock operations.
*/
- if (!io || !cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
- subenv->lse_env = env;
- subenv->lse_io = io;
- subenv->lse_sub = NULL;
- } else {
- sub = lov_sub_get(env, lio, lls->sub_stripe);
- if (!IS_ERR(sub)) {
- subenv->lse_env = sub->sub_env;
- subenv->lse_io = sub->sub_io;
- subenv->lse_sub = sub;
- } else {
- subenv = (void*)sub;
- }
- }
- return subenv;
-}
-
-static void lov_sublock_env_put(struct lov_sublock_env *subenv)
-{
- if (subenv && subenv->lse_sub)
- lov_sub_put(subenv->lse_sub);
+ if (!io || !cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
+ subenv->lse_env = env;
+ subenv->lse_io = io;
+ } else {
+ sub = lov_sub_get(env, lio, lls->sub_stripe);
+ if (!IS_ERR(sub)) {
+ subenv->lse_env = sub->sub_env;
+ subenv->lse_io = sub->sub_io;
+ } else {
+ subenv = (void*)sub;
+ }
+ }
+ return subenv;
}
static int lov_sublock_init(const struct lu_env *env,
if (!IS_ERR(subenv)) {
result = cl_lock_init(subenv->lse_env, &lls->sub_lock,
subenv->lse_io);
- lov_sublock_env_put(subenv);
} else {
/* error occurs. */
result = PTR_ERR(subenv);
rc = cl_lock_enqueue(subenv->lse_env, subenv->lse_io,
&lls->sub_lock, anchor);
- lov_sublock_env_put(subenv);
if (rc != 0)
break;
subenv = lov_sublock_env_get(env, lock, lls);
if (!IS_ERR(subenv)) {
cl_lock_cancel(subenv->lse_env, sublock);
- lov_sublock_env_put(subenv);
} else {
CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
"lov_lock_cancel fails with %ld.\n",
RETURN(rc);
}
-int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
+static int
+lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
{
struct lov_request_set *lovset = (struct lov_request_set *)data;
int err;
break;
}
}
- lov_sub_put(sub);
+
RETURN(rc);
}
set->set_count = 0;
atomic_set(&set->set_completes, 0);
atomic_set(&set->set_success, 0);
- atomic_set(&set->set_finish_checked, 0);
INIT_LIST_HEAD(&set->set_list);
- atomic_set(&set->set_refcount, 1);
- init_waitqueue_head(&set->set_waitq);
}
-void lov_finish_set(struct lov_request_set *set)
+static void lov_finish_set(struct lov_request_set *set)
{
struct list_head *pos, *n;
struct lov_request *req;
EXIT;
}
-int lov_set_finished(struct lov_request_set *set, int idempotent)
+static void
+lov_update_set(struct lov_request_set *set, struct lov_request *req, int rc)
{
- int completes = atomic_read(&set->set_completes);
-
- CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
-
- if (completes == set->set_count) {
- if (idempotent)
- return 1;
- if (atomic_inc_return(&set->set_finish_checked) == 1)
- return 1;
- }
- return 0;
-}
-
-void lov_update_set(struct lov_request_set *set,
- struct lov_request *req, int rc)
-{
- req->rq_complete = 1;
- req->rq_rc = rc;
-
atomic_inc(&set->set_completes);
if (rc == 0)
atomic_inc(&set->set_success);
-
- wake_up(&set->set_waitq);
}
-void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
+static void
+lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
{
list_add_tail(&req->rq_link, &set->set_list);
set->set_count++;
* If the OSC has not yet had a chance to connect to the OST the first time,
* wait once for it to connect instead of returning an error.
*/
-int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
+static int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
{
wait_queue_head_t waitq;
struct l_wait_info lwi;
(tot) += (add); \
} while(0)
-int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
+static int
+lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs, int success)
{
ENTRY;
rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
atomic_read(&set->set_success));
}
- lov_put_reqset(set);
+
+ lov_finish_set(set);
+
RETURN(rc);
}
-void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
- int success)
+static void
+lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
+ int success)
{
int shift = 0, quit = 0;
__u64 tmp;
obd_putref(lovobd);
out:
- if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
- lov_set_finished(set, 0)) {
- lov_statfs_interpret(NULL, set, set->set_count !=
- atomic_read(&set->set_success));
- }
-
RETURN(0);
}
struct lu_device *next;
struct lovsub_device *lsd;
- ENTRY;
- lsd = lu2lovsub_dev(d);
- next = cl2lu_dev(lsd->acid_next);
- lsd->acid_super = NULL;
- lsd->acid_next = NULL;
- RETURN(next);
+ ENTRY;
+ lsd = lu2lovsub_dev(d);
+ next = cl2lu_dev(lsd->acid_next);
+ lsd->acid_next = NULL;
+ RETURN(next);
}
static struct lu_device *lovsub_device_free(const struct lu_env *env,
+++ /dev/null
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * Implementation of cl_io for LOVSUB layer.
- *
- * Author: Nikita Danilov <nikita.danilov@sun.com>
- */
-
-#define DEBUG_SUBSYSTEM S_LOV
-
-#include "lov_cl_internal.h"
-
-/** \addtogroup lov
- * @{
- */
-
-/*****************************************************************************
- *
- * Lovsub io operations.
- *
- */
-
-/* All trivial */
-
-/** @} lov */
{
struct lovsub_lock *lsl;
- ENTRY;
- lsl = cl2lovsub_lock(slice);
- LASSERT(list_empty(&lsl->lss_parents));
- OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
- EXIT;
+ ENTRY;
+ lsl = cl2lovsub_lock(slice);
+ OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
+ EXIT;
}
static const struct cl_lock_operations lovsub_lock_ops = {
ENTRY;
OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, GFP_NOFS);
if (lsk != NULL) {
- INIT_LIST_HEAD(&lsk->lss_parents);
cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
result = 0;
} else