#include "lov_internal.h"
-static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
- int stripeno, obd_off *obd_off);
-
-struct lov_lock_handles {
- struct portals_handle llh_handle;
- atomic_t llh_refcount;
- int llh_stripe_count;
- struct lustre_handle llh_handles[0];
-};
-
-static void lov_llh_addref(void *llhp)
-{
- struct lov_lock_handles *llh = llhp;
-
- atomic_inc(&llh->llh_refcount);
- CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
- atomic_read(&llh->llh_refcount));
-}
-
-static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
-{
- struct lov_lock_handles *llh;
-
- OBD_ALLOC(llh, sizeof *llh +
- sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
- if (llh == NULL) {
- CERROR("out of memory\n");
- return NULL;
- }
- atomic_set(&llh->llh_refcount, 2);
- llh->llh_stripe_count = lsm->lsm_stripe_count;
- INIT_LIST_HEAD(&llh->llh_handle.h_link);
- class_handle_hash(&llh->llh_handle, lov_llh_addref);
- return llh;
-}
-
-static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle)
-{
- ENTRY;
- LASSERT(handle != NULL);
- RETURN(class_handle2object(handle->cookie));
-}
-
-static void lov_llh_put(struct lov_lock_handles *llh)
-{
- CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh,
- atomic_read(&llh->llh_refcount) - 1);
- LASSERT(atomic_read(&llh->llh_refcount) > 0 &&
- atomic_read(&llh->llh_refcount) < 0x5a5a);
- if (atomic_dec_and_test(&llh->llh_refcount)) {
- LASSERT(list_empty(&llh->llh_handle.h_link));
- OBD_FREE(llh, sizeof *llh +
- sizeof(*llh->llh_handles) * llh->llh_stripe_count);
- }
-}
-
-static void lov_llh_destroy(struct lov_lock_handles *llh)
-{
- class_handle_unhash(&llh->llh_handle);
- lov_llh_put(llh);
-}
-
/* obd methods */
static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
struct obd_uuid *cluuid)
RETURN(-EINVAL);
}
+ if (desc->ld_default_stripe_size < PTLRPC_MAX_BRW_SIZE) {
+ CWARN("Increasing default_stripe_size "LPU64" to %u\n",
+ desc->ld_default_stripe_size, PTLRPC_MAX_BRW_SIZE);
+ CWARN("Please update config and run --write-conf on MDS\n");
+
+ desc->ld_default_stripe_size = PTLRPC_MAX_BRW_SIZE;
+ }
+
/* Because of 64-bit divide/mod operations only work with a 32-bit
* divisor in a 32-bit kernel, we cannot support a stripe width
* of 4GB or larger on 32-bit CPUs.
RETURN(0);
}
-
-/* compute object size given "stripeno" and the ost size */
-static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
- int stripeno)
-{
- unsigned long ssize = lsm->lsm_stripe_size;
- unsigned long swidth = ssize * lsm->lsm_stripe_count;
- unsigned long stripe_size;
- obd_size lov_size;
-
- if (ost_size == 0)
- return 0;
-
- /* do_div(a, b) returns a % b, and a = a / b */
- stripe_size = do_div(ost_size, ssize);
- if (stripe_size)
- lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
- else
- lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
-
- return lov_size;
-}
-
-static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
- struct lov_stripe_md *lsm, int stripeno, int *set)
-{
- valid &= src->o_valid;
-
- if (*set) {
- if (valid & OBD_MD_FLSIZE) {
- /* this handles sparse files properly */
- obd_size lov_size;
-
- lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
- if (lov_size > tgt->o_size)
- tgt->o_size = lov_size;
- }
- if (valid & OBD_MD_FLBLOCKS)
- tgt->o_blocks += src->o_blocks;
- if (valid & OBD_MD_FLBLKSZ)
- tgt->o_blksize += src->o_blksize;
- if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
- tgt->o_ctime = src->o_ctime;
- if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
- tgt->o_mtime = src->o_mtime;
- } else {
- memcpy(tgt, src, sizeof(*tgt));
- tgt->o_id = lsm->lsm_object_id;
- if (valid & OBD_MD_FLSIZE)
- tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
- *set = 1;
- }
-}
-
#ifndef log2
#define log2(n) ffz(~(n))
#endif
RETURN(rc);
}
-#define LOV_CREATE_RESEED_INTERVAL 1000
+static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+ struct lov_stripe_md *obj_mdp, *lsm;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ unsigned ost_idx;
+ int rc, i;
+ ENTRY;
+
+ LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
+ src_oa->o_flags & OBD_FL_RECREATE_OBJS);
+
+ OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
+ if (obj_mdp == NULL)
+ RETURN(-ENOMEM);
+
+ ost_idx = src_oa->o_nlink;
+ lsm = *ea;
+ if (lsm == NULL)
+ GOTO(out, rc = -EINVAL);
+ if (ost_idx >= lov->desc.ld_tgt_count)
+ GOTO(out, rc = -EINVAL);
+
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
+ if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
+ GOTO(out, rc = -EINVAL);
+ break;
+ }
+ }
+ if (i == lsm->lsm_stripe_count)
+ GOTO(out, rc = -EINVAL);
+
+ rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti);
+out:
+ OBD_FREE(obj_mdp, sizeof(*obj_mdp));
+ RETURN(rc);
+}
/* the LOV expects oa->o_id to be set to the LOV object id */
static int lov_create(struct obd_export *exp, struct obdo *src_oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
- static int ost_start_idx, ost_start_count;
struct lov_obd *lov;
- struct lov_stripe_md *lsm;
- struct lov_oinfo *loi = NULL;
- struct obdo *tmp_oa, *ret_oa;
- struct llog_cookie *cookies = NULL;
- unsigned ost_count, ost_idx;
- int set = 0, obj_alloc = 0, cookie_sent = 0, rc = 0, i;
+ struct lov_request_set *set = NULL;
+ struct list_head *pos;
+ int rc = 0;
ENTRY;
LASSERT(ea != NULL);
+ if (exp == NULL)
+ RETURN(-EINVAL);
if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
src_oa->o_flags == OBD_FL_DELORPHAN) {
RETURN(rc);
}
- if (exp == NULL)
- RETURN(-EINVAL);
-
lov = &exp->exp_obd->u.lov;
-
if (!lov->desc.ld_active_tgt_count)
RETURN(-EIO);
/* Recreate a specific object id at the given OST index */
- if (src_oa->o_valid & OBD_MD_FLFLAGS && src_oa->o_flags &
- OBD_FL_RECREATE_OBJS) {
- struct lov_stripe_md obj_md;
- struct lov_stripe_md *obj_mdp = &obj_md;
-
- ost_idx = src_oa->o_nlink;
- lsm = *ea;
- if (lsm == NULL)
- RETURN(-EINVAL);
- if (ost_idx >= lov->desc.ld_tgt_count)
- RETURN(-EINVAL);
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
- if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
- RETURN(-EINVAL);
- break;
- }
- }
- if (i == lsm->lsm_stripe_count)
- RETURN(-EINVAL);
-
- rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa,
- &obj_mdp, oti);
+ if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
+ (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+ rc = lov_recreate(exp, src_oa, ea, oti);
RETURN(rc);
}
- ret_oa = obdo_alloc();
- if (!ret_oa)
- RETURN(-ENOMEM);
-
- tmp_oa = obdo_alloc();
- if (!tmp_oa)
- GOTO(out_oa, rc = -ENOMEM);
-
- lsm = *ea;
- if (lsm == NULL) {
- int stripes;
- ost_count = lov_get_stripecnt(lov, 0);
-
- /* If the MDS file was truncated up to some size, stripe over
- * enough OSTs to allow the file to be created at that size. */
- if (src_oa->o_valid & OBD_MD_FLSIZE) {
- stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
- do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
-
- if (stripes > lov->desc.ld_active_tgt_count)
- RETURN(-EFBIG);
- if (stripes < ost_count)
- stripes = ost_count;
- } else {
- stripes = ost_count;
- }
-
- rc = lov_alloc_memmd(&lsm, stripes, lov->desc.ld_pattern ?
- lov->desc.ld_pattern : LOV_PATTERN_RAID0);
- if (rc < 0)
- GOTO(out_tmp, rc);
-
- rc = 0;
- }
-
- ost_count = lov->desc.ld_tgt_count;
-
- LASSERT(src_oa->o_valid & OBD_MD_FLID);
- lsm->lsm_object_id = src_oa->o_id;
- if (!lsm->lsm_stripe_size)
- lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
- if (!lsm->lsm_pattern) {
- lsm->lsm_pattern = lov->desc.ld_pattern ?
- lov->desc.ld_pattern : LOV_PATTERN_RAID0;
- }
-
- if (*ea == NULL || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) {
- if (--ost_start_count <= 0) {
- ost_start_idx = ll_insecure_random_int();
- ost_start_count = LOV_CREATE_RESEED_INTERVAL;
- } else if (lsm->lsm_stripe_count >=
- lov->desc.ld_active_tgt_count) {
- /* If we allocate from all of the stripes, make the
- * next file start on the next OST. */
- ++ost_start_idx;
- }
- ost_idx = ost_start_idx % ost_count;
- } else {
- ost_idx = lsm->lsm_oinfo[0].loi_ost_idx;
- }
-
- CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
- lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
-
- /* XXX LOV STACKING: need to figure out how many real OSCs */
- if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
- oti_alloc_cookies(oti, lsm->lsm_stripe_count);
- if (!oti->oti_logcookies)
- GOTO(out_cleanup, rc = -ENOMEM);
- cookies = oti->oti_logcookies;
- }
-
- loi = lsm->lsm_oinfo;
- for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
- struct lov_stripe_md obj_md;
- struct lov_stripe_md *obj_mdp = &obj_md;
- int err;
-
- ++ost_start_idx;
- if (lov->tgts[ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
- continue;
- }
-
- /* create data objects with "parent" OA */
- memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
-
- /* XXX When we start creating objects on demand, we need to
- * make sure that we always create the object on the
- * stripe which holds the existing file size.
- */
- if (src_oa->o_valid & OBD_MD_FLSIZE) {
- if (lov_stripe_offset(lsm, src_oa->o_size, i,
- &tmp_oa->o_size) < 0 &&
- tmp_oa->o_size)
- tmp_oa->o_size--;
-
- CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
- i, tmp_oa->o_size, src_oa->o_size);
- }
+ rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
+ if (rc)
+ RETURN(rc);
+ list_for_each (pos, &set->set_list) {
+ struct lov_request *req =
+ list_entry(pos, struct lov_request, rq_link);
/* XXX: LOV STACKING: use real "obj_mdp" sub-data */
- err = obd_create(lov->tgts[ost_idx].ltd_exp, tmp_oa, &obj_mdp,
- oti);
- if (err) {
- if (lov->tgts[ost_idx].active) {
- CERROR("error creating objid "LPX64" sub-object"
- " on OST idx %d/%d: rc = %d\n",
- src_oa->o_id, ost_idx,
- lsm->lsm_stripe_count, err);
- if (err > 0) {
- CERROR("obd_create returned invalid "
- "err %d\n", err);
- err = -EIO;
- }
- }
- if (!rc)
- rc = err;
- continue;
- }
- if (oti->oti_objid)
- oti->oti_objid[ost_idx] = tmp_oa->o_id;
- loi->loi_id = tmp_oa->o_id;
- loi->loi_ost_idx = ost_idx;
- CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
- lsm->lsm_object_id, loi->loi_id, ost_idx);
-
- lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
- obj_alloc, &set);
- loi_init(loi);
-
- if (cookies)
- ++oti->oti_logcookies;
- if (tmp_oa->o_valid & OBD_MD_FLCOOKIE)
- ++cookie_sent;
- ++obj_alloc;
- ++loi;
-
- /* If we have allocated enough objects, we are OK */
- if (obj_alloc == lsm->lsm_stripe_count)
- GOTO(out_done, rc = 0);
- }
-
- if (obj_alloc == 0) {
- if (rc == 0)
- rc = -EIO;
- GOTO(out_cleanup, rc);
- }
-
- /* If we were passed specific striping params, then a failure to
- * meet those requirements is an error, since we can't reallocate
- * that memory (it might be part of a larger array or something).
- *
- * We can only get here if lsm_stripe_count was originally > 1.
- */
- if (*ea != NULL) {
- CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
- lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
- if (rc == 0)
- rc = -EFBIG;
- GOTO(out_cleanup, rc);
- } else {
- struct lov_stripe_md *lsm_new;
- /* XXX LOV STACKING call into osc for sizes */
- unsigned oldsize, newsize;
-
- if (oti && cookies && cookie_sent) {
- oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
- newsize = obj_alloc * sizeof(*cookies);
-
- oti_alloc_cookies(oti, obj_alloc);
- if (oti->oti_logcookies) {
- memcpy(oti->oti_logcookies, cookies, newsize);
- OBD_FREE(cookies, oldsize);
- cookies = oti->oti_logcookies;
- } else {
- CWARN("'leaking' %d bytes\n", oldsize-newsize);
- }
- }
-
- CWARN("using fewer stripes for object "LPX64": old %u new %u\n",
- lsm->lsm_object_id, lsm->lsm_stripe_count, obj_alloc);
- oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
- newsize = lov_stripe_md_size(obj_alloc);
- OBD_ALLOC(lsm_new, newsize);
- if (lsm_new != NULL) {
- memcpy(lsm_new, lsm, newsize);
- lsm_new->lsm_stripe_count = obj_alloc;
- OBD_FREE(lsm, oldsize);
- lsm = lsm_new;
- } else {
- CWARN("'leaking' %d bytes\n", oldsize - newsize);
- }
- rc = 0;
- }
- EXIT;
- out_done:
- *ea = lsm;
- if (src_oa->o_valid & OBD_MD_FLSIZE &&
- ret_oa->o_size != src_oa->o_size) {
- CERROR("original size "LPU64" isn't new object size "LPU64"\n",
- src_oa->o_size, ret_oa->o_size);
- LBUG();
- }
- ret_oa->o_id = src_oa->o_id;
- memcpy(src_oa, ret_oa, sizeof(*src_oa));
-
- out_tmp:
- obdo_free(tmp_oa);
- out_oa:
- obdo_free(ret_oa);
- if (oti && cookies) {
- oti->oti_logcookies = cookies;
- if (!cookie_sent) {
- oti_free_cookies(oti);
- src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
- } else {
- src_oa->o_valid |= OBD_MD_FLCOOKIE;
- }
+ rc = obd_create(lov->tgts[req->rq_idx].ltd_exp,
+ req->rq_oa, &req->rq_md, oti);
+ lov_update_create_set(set, req, rc);
}
+ rc = lov_fini_create_set(set, ea);
RETURN(rc);
-
- out_cleanup:
- while (obj_alloc-- > 0) {
- struct obd_export *sub_exp;
- int err;
-
- --loi;
- sub_exp = lov->tgts[loi->loi_ost_idx].ltd_exp;
- /* destroy already created objects here */
- memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
- tmp_oa->o_id = loi->loi_id;
-
- err = obd_destroy(sub_exp, tmp_oa, NULL, oti);
- if (err)
- CERROR("Failed to uncreate objid "LPX64" subobj "LPX64
- " on OST idx %d: rc = %d\n", src_oa->o_id,
- loi->loi_id, loi->loi_ost_idx, err);
- }
- if (*ea == NULL)
- obd_free_memmd(exp, &lsm);
- goto out_tmp;
}
-#define lsm_bad_magic(LSMP) \
-({ \
- struct lov_stripe_md *_lsm__ = (LSMP); \
- int _ret__ = 0; \
- if (!_lsm__) { \
- CERROR("LOV requires striping ea\n"); \
- _ret__ = 1; \
- } else if (_lsm__->lsm_magic != LOV_MAGIC) { \
- CERROR("LOV striping magic bad %#x != %#x\n", \
- _lsm__->lsm_magic, LOV_MAGIC); \
- _ret__ = 1; \
- } \
- _ret__; \
-})
+#define ASSERT_LSM_MAGIC(lsmp) \
+do { \
+ LASSERT((lsmp) != NULL); \
+ LASSERTF((lsmp)->lsm_magic == LOV_MAGIC, "%p, %x", \
+ (lsmp), (lsmp)->lsm_magic); \
+} while (0)
static int lov_destroy(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *lsm, struct obd_trans_info *oti)
{
- struct obdo tmp;
+ struct lov_request_set *set;
+ struct lov_request *req;
+ struct list_head *pos;
struct lov_obd *lov;
- struct lov_oinfo *loi;
- int rc = 0, i;
+ int rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
lov = &exp->exp_obd->u.lov;
- for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+ rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
+ if (rc)
+ RETURN(rc);
+
+ list_for_each (pos, &set->set_list) {
int err;
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- /* Orphan clean up will (someday) fix this up. */
- if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
- oti->oti_logcookies++;
- continue;
- }
+ req = list_entry(pos, struct lov_request, rq_link);
- memcpy(&tmp, oa, sizeof(tmp));
- tmp.o_id = loi->loi_id;
- err = obd_destroy(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp,
- NULL, oti);
- if (err && lov->tgts[loi->loi_ost_idx].active) {
+ /* XXX update the cookie position */
+ oti->oti_logcookies = set->set_cookies + req->rq_stripe;
+ rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
+ NULL, oti);
+ err = lov_update_common_set(set, req, rc);
+ if (rc) {
CERROR("error: destroying objid "LPX64" subobj "
- LPX64" on OST idx %d: rc = %d\n",
- oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+ LPX64" on OST idx %d: rc = %d\n",
+ set->set_oa->o_id, req->rq_oa->o_id,
+ req->rq_idx, rc);
if (!rc)
rc = err;
}
}
+ lov_fini_destroy_set(set);
RETURN(rc);
}
static int lov_getattr(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *lsm)
{
- struct obdo tmp;
+ struct lov_request_set *set;
+ struct lov_request *req;
+ struct list_head *pos;
struct lov_obd *lov;
- struct lov_oinfo *loi;
- int i, rc = 0, set = 0;
+ int err = 0, rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
lov = &exp->exp_obd->u.lov;
+
+ rc = lov_prep_getattr_set(exp, oa, lsm, &set);
+ if (rc)
+ RETURN(rc);
- CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
- lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
- for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- int err;
-
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+
CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
- "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
- /* create data objects with "parent" OA */
- memcpy(&tmp, oa, sizeof(tmp));
- tmp.o_id = loi->loi_id;
+ "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id,
+ req->rq_idx);
- err = obd_getattr(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp,
- NULL);
+ rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp,
+ req->rq_oa, NULL);
+ err = lov_update_common_set(set, req, rc);
if (err) {
- if (lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: getattr objid "LPX64" subobj "
- LPX64" on OST idx %d: rc = %d\n",
- oa->o_id, loi->loi_id, loi->loi_ost_idx,
- err);
- RETURN(err);
- }
- } else {
- lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
+ CERROR("error: getattr objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ set->set_oa->o_id, req->rq_oa->o_id,
+ req->rq_idx, err);
+ break;
}
}
- if (!set)
- rc = -EIO;
+
+ rc = lov_fini_getattr_set(set);
+ if (err)
+ rc = err;
RETURN(rc);
}
static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
int rc)
{
- struct lov_getattr_async_args *aa = data;
- struct lov_stripe_md *lsm = aa->aa_lsm;
- struct obdo *oa = aa->aa_oa;
- struct obdo *obdos = aa->aa_obdos;
- struct lov_oinfo *loi;
- int i;
- int set = 0;
+ struct lov_request_set *lovset = (struct lov_request_set *)data;
ENTRY;
- if (rc == 0) {
- /* NB all stripe requests succeeded to get here */
-
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++, loi++) {
- if (obdos[i].o_valid == 0) /* inactive stripe */
- continue;
-
- lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
- i, &set);
- }
-
- if (!set) {
- CERROR ("No stripes had valid attrs\n");
- rc = -EIO;
- }
+ /* don't do attribute merge if this aysnc op failed */
+ if (rc) {
+ lovset->set_completes = 0;
+ lov_fini_getattr_set(lovset);
+ } else {
+ rc = lov_fini_getattr_set(lovset);
}
-
- OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
RETURN (rc);
}
struct lov_stripe_md *lsm,
struct ptlrpc_request_set *rqset)
{
- struct obdo *obdos;
+ struct lov_request_set *lovset;
struct lov_obd *lov;
- struct lov_oinfo *loi;
- struct lov_getattr_async_args *aa;
- int i, rc = 0, set = 0;
+ struct list_head *pos;
+ struct lov_request *req;
+ int rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
lov = &exp->exp_obd->u.lov;
- OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
- if (obdos == NULL)
- RETURN(-ENOMEM);
+ rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
+ if (rc)
+ RETURN(rc);
CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
- for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- int err;
-
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- /* leaves obdos[i].obd_valid unset */
- continue;
- }
+ list_for_each (pos, &lovset->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+
CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
- "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
- /* create data objects with "parent" OA */
- memcpy(&obdos[i], oa, sizeof(obdos[i]));
- obdos[i].o_id = loi->loi_id;
-
- err = obd_getattr_async(lov->tgts[loi->loi_ost_idx].ltd_exp,
- &obdos[i], NULL, rqset);
- if (err) {
+ "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id,
+ req->rq_idx);
+ rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
+ req->rq_oa, NULL, rqset);
+ if (rc) {
CERROR("error: getattr objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
- oa->o_id, loi->loi_id, loi->loi_ost_idx,
- err);
- GOTO(out_obdos, rc = err);
+ lovset->set_oa->o_id, req->rq_oa->o_id,
+ req->rq_idx, rc);
+ GOTO(out, rc);
}
- set = 1;
+ lov_update_common_set(lovset, req, rc);
}
- if (!set)
- GOTO (out_obdos, rc = -EIO);
-
+
+ LASSERT(rc == 0);
LASSERT (rqset->set_interpret == NULL);
rqset->set_interpret = lov_getattr_interpret;
- LASSERT (sizeof (rqset->set_args) >= sizeof (*aa));
- aa = (struct lov_getattr_async_args *)&rqset->set_args;
- aa->aa_lsm = lsm;
- aa->aa_oa = oa;
- aa->aa_obdos = obdos;
- aa->aa_lov = lov;
- GOTO(out, rc = 0);
-
-out_obdos:
- OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
+ rqset->set_arg = (void *)lovset;
+ RETURN(rc);
out:
+ LASSERT(rc);
+ lov_fini_getattr_set(lovset);
RETURN(rc);
}
-
static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
struct lov_stripe_md *lsm, struct obd_trans_info *oti)
{
- struct obdo *tmp_oa, *ret_oa;
+ struct lov_request_set *set;
struct lov_obd *lov;
- struct lov_oinfo *loi;
- int rc = 0, i, set = 0;
+ struct list_head *pos;
+ struct lov_request *req;
+ int err = 0, rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
OBD_MD_FLATIME | OBD_MD_FLMTIME |
OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
OBD_MD_FLSIZE)));
- ret_oa = obdo_alloc();
- if (!ret_oa)
- RETURN(-ENOMEM);
-
- tmp_oa = obdo_alloc();
- if (!tmp_oa)
- GOTO(out_oa, rc = -ENOMEM);
-
lov = &exp->exp_obd->u.lov;
- for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- int err;
-
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
- memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
- tmp_oa->o_id = loi->loi_id;
-
- if (src_oa->o_valid & OBD_MD_FLSIZE) {
- if (lov_stripe_offset(lsm, src_oa->o_size, i,
- &tmp_oa->o_size) < 0 &&
- tmp_oa->o_size)
- tmp_oa->o_size--;
-
- CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
- i, tmp_oa->o_size, src_oa->o_size);
- }
- if (src_oa->o_valid & OBD_MD_FLMTIME)
- loi->loi_mtime = src_oa->o_mtime;
+ rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
+ if (rc)
+ RETURN(rc);
- err = obd_setattr(lov->tgts[loi->loi_ost_idx].ltd_exp, tmp_oa,
- NULL, NULL);
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
+ NULL, NULL);
+ err = lov_update_common_set(set, req, rc);
if (err) {
- if (lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: setattr objid "LPX64" subobj "
- LPX64" on OST idx %d: rc = %d\n",
- src_oa->o_id, loi->loi_id,
- loi->loi_ost_idx, err);
- if (!rc)
- rc = err;
- }
- continue;
- }
- lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
- }
- if (!set && !rc)
- rc = -EIO;
-
- ret_oa->o_id = src_oa->o_id;
- memcpy(src_oa, ret_oa, sizeof(*src_oa));
- GOTO(out_tmp, rc);
-out_tmp:
- obdo_free(tmp_oa);
-out_oa:
- obdo_free(ret_oa);
- return rc;
-}
-
-/* we have an offset in file backed by an lov and want to find out where
- * that offset lands in our given stripe of the file. for the easy
- * case where the offset is within the stripe, we just have to scale the
- * offset down to make it relative to the stripe instead of the lov.
- *
- * the harder case is what to do when the offset doesn't intersect the
- * stripe. callers will want start offsets clamped ahead to the start
- * of the nearest stripe in the file. end offsets similarly clamped to the
- * nearest ending byte of a stripe in the file:
- *
- * all this function does is move offsets to the nearest region of the
- * stripe, and it does its work "mod" the full length of all the stripes.
- * consider a file with 3 stripes:
- *
- * S E
- * ---------------------------------------------------------------------
- * | 0 | 1 | 2 | 0 | 1 | 2 |
- * ---------------------------------------------------------------------
- *
- * to find stripe 1's offsets for S and E, it divides by the full stripe
- * width and does its math in the context of a single set of stripes:
- *
- * S E
- * -----------------------------------
- * | 0 | 1 | 2 |
- * -----------------------------------
- *
- * it'll notice that E is outside stripe 1 and clamp it to the end of the
- * stripe, then multiply it back out by lov_off to give the real offsets in
- * the stripe:
- *
- * S E
- * ---------------------------------------------------------------------
- * | 1 | 1 | 1 | 1 | 1 | 1 |
- * ---------------------------------------------------------------------
- *
- * it would have done similarly and pulled S forward to the start of a 1
- * stripe if, say, S had landed in a 0 stripe.
- *
- * this rounding isn't always correct. consider an E lov offset that lands
- * on a 0 stripe, the "mod stripe width" math will pull it forward to the
- * start of a 1 stripe, when in fact it wanted to be rounded back to the end
- * of a previous 1 stripe. this logic is handled by callers and this is why:
- *
- * this function returns < 0 when the offset was "before" the stripe and
- * was moved forward to the start of the stripe in question; 0 when it
- * falls in the stripe and no shifting was done; > 0 when the offset
- * was outside the stripe and was pulled back to its final byte. */
-static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
- int stripeno, obd_off *obd_off)
-{
- unsigned long ssize = lsm->lsm_stripe_size;
- unsigned long swidth = ssize * lsm->lsm_stripe_count;
- unsigned long stripe_off, this_stripe;
- int ret = 0;
-
- if (lov_off == OBD_OBJECT_EOF) {
- *obd_off = OBD_OBJECT_EOF;
- return 0;
- }
-
- /* do_div(a, b) returns a % b, and a = a / b */
- stripe_off = do_div(lov_off, swidth);
-
- this_stripe = stripeno * ssize;
- if (stripe_off < this_stripe) {
- stripe_off = 0;
- ret = -1;
- } else {
- stripe_off -= this_stripe;
-
- if (stripe_off >= ssize) {
- stripe_off = ssize;
- ret = 1;
- }
- }
-
- *obd_off = lov_off * ssize + stripe_off;
- return ret;
-}
-
-/* Given a whole-file size and a stripe number, give the file size which
- * corresponds to the individual object of that stripe.
- *
- * This behaves basically in the same was as lov_stripe_offset, except that
- * file sizes falling before the beginning of a stripe are clamped to the end
- * of the previous stripe, not the beginning of the next:
- *
- * S
- * ---------------------------------------------------------------------
- * | 0 | 1 | 2 | 0 | 1 | 2 |
- * ---------------------------------------------------------------------
- *
- * if clamped to stripe 2 becomes:
- *
- * S
- * ---------------------------------------------------------------------
- * | 0 | 1 | 2 | 0 | 1 | 2 |
- * ---------------------------------------------------------------------
- */
-static obd_off lov_size_to_stripe(struct lov_stripe_md *lsm, obd_off file_size,
- int stripeno)
-{
- unsigned long ssize = lsm->lsm_stripe_size;
- unsigned long swidth = ssize * lsm->lsm_stripe_count;
- unsigned long stripe_off, this_stripe;
-
- if (file_size == OBD_OBJECT_EOF)
- return OBD_OBJECT_EOF;
-
- /* do_div(a, b) returns a % b, and a = a / b */
- stripe_off = do_div(file_size, swidth);
-
- this_stripe = stripeno * ssize;
- if (stripe_off < this_stripe) {
- /* Move to end of previous stripe, or zero */
- if (file_size > 0) {
- file_size--;
- stripe_off = ssize;
- } else {
- stripe_off = 0;
- }
- } else {
- stripe_off -= this_stripe;
-
- if (stripe_off >= ssize) {
- /* Clamp to end of this stripe */
- stripe_off = ssize;
+ CERROR("error: setattr objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ set->set_oa->o_id, req->rq_oa->o_id,
+ req->rq_idx, err);
+ if (!rc)
+ rc = err;
}
}
-
- return (file_size * ssize + stripe_off);
-}
-
-/* given an extent in an lov and a stripe, calculate the extent of the stripe
- * that is contained within the lov extent. this returns true if the given
- * stripe does intersect with the lov extent. */
-static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
- obd_off start, obd_off end,
- obd_off *obd_start, obd_off *obd_end)
-{
- int start_side, end_side;
-
- start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
- end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
-
- CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n",
- start, end, start_side, *obd_start, *obd_end, end_side);
-
- /* this stripe doesn't intersect the file extent when neither
- * start or the end intersected the stripe and obd_start and
- * obd_end got rounded up to the save value. */
- if (start_side != 0 && end_side != 0 && *obd_start == *obd_end)
- return 0;
-
- /* as mentioned in the lov_stripe_offset commentary, end
- * might have been shifted in the wrong direction. This
- * happens when an end offset is before the stripe when viewed
- * through the "mod stripe size" math. we detect it being shifted
- * in the wrong direction and touch it up.
- * interestingly, this can't underflow since end must be > start
- * if we passed through the previous check.
- * (should we assert for that somewhere?) */
- if (end_side != 0)
- (*obd_end)--;
-
- return 1;
-}
-
-/* compute which stripe number "lov_off" will be written into */
-static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
-{
- unsigned long ssize = lsm->lsm_stripe_size;
- unsigned long swidth = ssize * lsm->lsm_stripe_count;
- unsigned long stripe_off;
-
- stripe_off = do_div(lov_off, swidth);
-
- return stripe_off / ssize;
+ err = lov_fini_setattr_set(set);
+ if (!rc)
+ rc = err;
+ RETURN(rc);
}
/* FIXME: maybe we'll just make one node the authoritative attribute node, then
struct lov_stripe_md *lsm,
obd_off start, obd_off end, struct obd_trans_info *oti)
{
- struct obdo tmp;
+ struct lov_request_set *set;
struct lov_obd *lov;
- struct lov_oinfo *loi;
- int rc = 0, i;
+ struct list_head *pos;
+ struct lov_request *req;
+ int err = 0, rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
lov = &exp->exp_obd->u.lov;
- for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- obd_off starti, endi;
- int err;
-
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
- if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
- continue;
+ rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
+ if (rc)
+ RETURN(rc);
- /* create data objects with "parent" OA */
- memcpy(&tmp, oa, sizeof(tmp));
- tmp.o_id = loi->loi_id;
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
- err = obd_punch(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp, NULL,
- starti, endi, NULL);
+ rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
+ NULL, req->rq_extent.start,
+ req->rq_extent.end, NULL);
+ err = lov_update_punch_set(set, req, rc);
if (err) {
- if (lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: punch objid "LPX64" subobj "LPX64
- " on OST idx %d: rc = %d\n", oa->o_id,
- loi->loi_id, loi->loi_ost_idx, err);
- }
+ CERROR("error: punch objid "LPX64" subobj "LPX64
+ " on OST idx %d: rc = %d\n", set->set_oa->o_id,
+ req->rq_oa->o_id, req->rq_idx, rc);
if (!rc)
rc = err;
- } else {
- loi->loi_kms = loi->loi_rss = starti;
}
}
+ err = lov_fini_punch_set(set);
+ if (!rc)
+ rc = err;
RETURN(rc);
}
static int lov_sync(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *lsm, obd_off start, obd_off end)
{
- struct obdo *tmp;
+ struct lov_request_set *set;
struct lov_obd *lov;
- struct lov_oinfo *loi;
- int rc = 0, i;
+ struct list_head *pos;
+ struct lov_request *req;
+ int err = 0, rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp->exp_obd)
RETURN(-ENODEV);
- tmp = obdo_alloc();
- if (!tmp)
- RETURN(-ENOMEM);
-
- lov = &exp->exp_obd->u.lov;
- for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- obd_off starti, endi;
- int err;
-
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
- if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
- continue;
+ lov = &exp->exp_obd->u.lov;
+ rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
+ if (rc)
+ RETURN(rc);
- memcpy(tmp, oa, sizeof(*tmp));
- tmp->o_id = loi->loi_id;
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
- err = obd_sync(lov->tgts[loi->loi_ost_idx].ltd_exp, tmp, NULL,
- starti, endi);
+ rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
+ NULL, req->rq_extent.start, req->rq_extent.end);
+ err = lov_update_common_set(set, req, rc);
if (err) {
- if (lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: fsync objid "LPX64" subobj "LPX64
- " on OST idx %d: rc = %d\n", oa->o_id,
- loi->loi_id, loi->loi_ost_idx, err);
- }
+ CERROR("error: fsync objid "LPX64" subobj "LPX64
+ " on OST idx %d: rc = %d\n", set->set_oa->o_id,
+ req->rq_oa->o_id, req->rq_idx, rc);
if (!rc)
rc = err;
}
}
-
- obdo_free(tmp);
+ err = lov_fini_sync_set(set);
+ if (!rc)
+ rc = err;
RETURN(rc);
}
for (i = 0; i < oa_bufs; i++) {
int stripe = lov_stripe_number(lsm, pga[i].off);
int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
- struct ldlm_extent ext, subext;
- ext.start = pga[i].off;
- ext.end = pga[i].off + pga[i].count;
+ obd_off start, end;
- if (!lov_stripe_intersects(lsm, i, ext.start, ext.end,
- &subext.start, &subext.end))
+ if (!lov_stripe_intersects(lsm, i, pga[i].off,
+ pga[i].off + pga[i].count,
+ &start, &end))
continue;
if (lov->tgts[ost].active == 0) {
struct lov_stripe_md *lsm, obd_count oa_bufs,
struct brw_page *pga, struct obd_trans_info *oti)
{
- struct {
- int bufct;
- int index;
- int subcount;
- struct lov_stripe_md lsm;
- int ost_idx;
- } *stripeinfo, *si, *si_last;
- struct obdo *ret_oa = NULL, *tmp_oa = NULL;
- struct lov_obd *lov;
- struct brw_page *ioarr;
- struct lov_oinfo *loi;
- int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count, set = 0;
+ struct lov_request_set *set;
+ struct lov_request *req;
+ struct list_head *pos;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int err, rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
-
- lov = &exp->exp_obd->u.lov;
+ ASSERT_LSM_MAGIC(lsm);
if (cmd == OBD_BRW_CHECK) {
rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
RETURN(rc);
}
- OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
- if (!stripeinfo)
- RETURN(-ENOMEM);
-
- OBD_ALLOC(where, sizeof(*where) * oa_bufs);
- if (!where)
- GOTO(out_sinfo, rc = -ENOMEM);
-
- OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
- if (!ioarr)
- GOTO(out_where, rc = -ENOMEM);
-
- if (src_oa) {
- ret_oa = obdo_alloc();
- if (!ret_oa)
- GOTO(out_ioarr, rc = -ENOMEM);
-
- tmp_oa = obdo_alloc();
- if (!tmp_oa)
- GOTO(out_oa, rc = -ENOMEM);
- }
-
- for (i = 0; i < oa_bufs; i++) {
- where[i] = lov_stripe_number(lsm, pga[i].off);
- stripeinfo[where[i]].bufct++;
- }
-
- for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
- i < stripe_count; i++, loi++, si_last = si, si++) {
- if (i > 0)
- si->index = si_last->index + si_last->bufct;
- si->lsm.lsm_object_id = loi->loi_id;
- si->ost_idx = loi->loi_ost_idx;
- }
-
- for (i = 0; i < oa_bufs; i++) {
- int which = where[i];
- int shift;
-
- shift = stripeinfo[which].index + stripeinfo[which].subcount;
- LASSERT(shift < oa_bufs);
- ioarr[shift] = pga[i];
- lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
- stripeinfo[which].subcount++;
- }
-
- for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
- int shift = si->index;
-
- if (lov->tgts[si->ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
- GOTO(out_oa, rc = -EIO);
- }
-
- if (si->bufct) {
- LASSERT(shift < oa_bufs);
- if (src_oa)
- memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
-
- tmp_oa->o_id = si->lsm.lsm_object_id;
- rc = obd_brw(cmd, lov->tgts[si->ost_idx].ltd_exp,
- tmp_oa, &si->lsm, si->bufct,
- &ioarr[shift], oti);
- if (rc)
- GOTO(out_oa, rc);
+ rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
+ if (rc)
+ RETURN(rc);
- lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
- i, &set);
- }
+ list_for_each (pos, &set->set_list) {
+ struct obd_export *sub_exp;
+ struct brw_page *sub_pga;
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ sub_exp = lov->tgts[req->rq_idx].ltd_exp;
+ sub_pga = set->set_pga + req->rq_pgaidx;
+ rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md,
+ req->rq_oabufs, sub_pga, oti);
+ if (rc)
+ break;
+ lov_update_common_set(set, req, rc);
}
- ret_oa->o_id = src_oa->o_id;
- memcpy(src_oa, ret_oa, sizeof(*src_oa));
-
- GOTO(out_oa, rc);
- out_oa:
- if (tmp_oa)
- obdo_free(tmp_oa);
- if (ret_oa)
- obdo_free(ret_oa);
- out_ioarr:
- OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
- out_where:
- OBD_FREE(where, sizeof(*where) * oa_bufs);
- out_sinfo:
- OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
- return rc;
+ err = lov_fini_brw_set(set);
+ if (!rc)
+ rc = err;
+ RETURN(rc);
}
-static int lov_brw_interpret(struct ptlrpc_request_set *set, void *data, int rc)
+static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
+ int rc)
{
- struct lov_brw_async_args *aa = data;
- struct lov_stripe_md *lsm = aa->aa_lsm;
- struct obdo *obdos = aa->aa_obdos;
- struct lov_oinfo *loi;
- int i = 0;
+ struct lov_request_set *lovset = (struct lov_request_set *)data;
ENTRY;
-
- for (loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++) {
- if (obdos[i].o_valid & OBD_MD_FLBLOCKS)
- loi->loi_blocks = obdos[i].o_blocks;
- if (obdos[i].o_valid & OBD_MD_FLMTIME)
- loi->loi_mtime = obdos[i].o_mtime;
+
+ if (rc) {
+ lovset->set_completes = 0;
+ lov_fini_brw_set(lovset);
+ } else {
+ rc = lov_fini_brw_set(lovset);
}
-
- OBD_FREE(obdos, lsm->lsm_stripe_count * sizeof(*obdos));
- OBD_FREE(aa->aa_ioarr, sizeof(*aa->aa_ioarr) * aa->aa_oa_bufs);
+
RETURN(rc);
}
struct brw_page *pga, struct ptlrpc_request_set *set,
struct obd_trans_info *oti)
{
- struct {
- int bufct;
- int index;
- int subcount;
- struct lov_stripe_md lsm;
- int ost_idx;
- } *stripeinfo, *si, *si_last;
- struct lov_obd *lov;
- struct brw_page *ioarr;
- struct obdo *obdos = NULL;
- struct lov_oinfo *loi;
- struct lov_brw_async_args *aa;
- int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
+ struct lov_request_set *lovset;
+ struct lov_request *req;
+ struct list_head *pos;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
-
- lov = &exp->exp_obd->u.lov;
+ ASSERT_LSM_MAGIC(lsm);
if (cmd == OBD_BRW_CHECK) {
rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
RETURN(rc);
}
- OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
- if (!stripeinfo)
- RETURN(-ENOMEM);
-
- OBD_ALLOC(where, sizeof(*where) * oa_bufs);
- if (!where)
- GOTO(out_sinfo, rc = -ENOMEM);
-
- if (oa) {
- OBD_ALLOC(obdos, sizeof(*obdos) * stripe_count);
- if (!obdos)
- GOTO(out_where, rc = -ENOMEM);
- }
-
- OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
- if (!ioarr)
- GOTO(out_obdos, rc = -ENOMEM);
-
- for (i = 0; i < oa_bufs; i++) {
- where[i] = lov_stripe_number(lsm, pga[i].off);
- stripeinfo[where[i]].bufct++;
- }
-
- for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
- i < stripe_count; i++, loi++, si_last = si, si++) {
- if (i > 0)
- si->index = si_last->index + si_last->bufct;
- si->lsm.lsm_object_id = loi->loi_id;
- si->ost_idx = loi->loi_ost_idx;
-
- if (oa) {
- memcpy(&obdos[i], oa, sizeof(*obdos));
- obdos[i].o_id = si->lsm.lsm_object_id;
- }
- }
-
- for (i = 0; i < oa_bufs; i++) {
- int which = where[i];
- int shift;
-
- shift = stripeinfo[which].index + stripeinfo[which].subcount;
- LASSERT(shift < oa_bufs);
- ioarr[shift] = pga[i];
- lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
- stripeinfo[which].subcount++;
- }
-
- for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
- int shift = si->index;
-
- if (si->bufct == 0)
- continue;
-
- if (lov->tgts[si->ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
- GOTO(out_ioarr, rc = -EIO);
- }
-
- LASSERT(shift < oa_bufs);
+ rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
+ if (rc)
+ RETURN(rc);
- rc = obd_brw_async(cmd, lov->tgts[si->ost_idx].ltd_exp,
- &obdos[i], &si->lsm, si->bufct,
- &ioarr[shift], set, oti);
+ list_for_each (pos, &lovset->set_list) {
+ struct obd_export *sub_exp;
+ struct brw_page *sub_pga;
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ sub_exp = lov->tgts[req->rq_idx].ltd_exp;
+ sub_pga = lovset->set_pga + req->rq_pgaidx;
+ rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
+ req->rq_oabufs, sub_pga, set, oti);
if (rc)
- GOTO(out_ioarr, rc);
+ GOTO(out, rc);
+ lov_update_common_set(lovset, req, rc);
}
LASSERT(rc == 0);
LASSERT(set->set_interpret == NULL);
set->set_interpret = (set_interpreter_func)lov_brw_interpret;
- LASSERT(sizeof(set->set_args) >= sizeof(struct lov_brw_async_args));
- aa = (struct lov_brw_async_args *)&set->set_args;
- aa->aa_lsm = lsm;
- aa->aa_obdos = obdos;
- aa->aa_oa = oa;
- aa->aa_ioarr = ioarr;
- aa->aa_oa_bufs = oa_bufs;
-
- /* Don't free ioarr or obdos - that's done in lov_brw_interpret */
- GOTO(out_where, rc);
-
- out_ioarr:
- OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
- out_obdos:
- OBD_FREE(obdos, stripe_count * sizeof(*obdos));
- out_where:
- OBD_FREE(where, sizeof(*where) * oa_bufs);
- out_sinfo:
- OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
- return rc;
+ set->set_arg = (void *)lovset;
+
+ RETURN(rc);
+out:
+ lov_fini_brw_set(lovset);
+ RETURN(rc);
}
-struct lov_async_page *lap_from_cookie(void *cookie)
-{
- struct lov_async_page *lap = cookie;
- if (lap->lap_magic != LAP_MAGIC)
- return ERR_PTR(-EINVAL);
- return lap;
-};
-
static int lov_ap_make_ready(void *data, int cmd)
{
- struct lov_async_page *lap = lap_from_cookie(data);
- /* XXX should these assert? */
- if (IS_ERR(lap))
- return -EINVAL;
+ struct lov_async_page *lap = LAP_FROM_COOKIE(data);
return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
}
static int lov_ap_refresh_count(void *data, int cmd)
{
- struct lov_async_page *lap = lap_from_cookie(data);
- if (IS_ERR(lap))
- return -EINVAL;
+ struct lov_async_page *lap = LAP_FROM_COOKIE(data);
return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
cmd);
}
static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
{
- struct lov_async_page *lap = lap_from_cookie(data);
- /* XXX should these assert? */
- if (IS_ERR(lap))
- return;
+ struct lov_async_page *lap = LAP_FROM_COOKIE(data);
lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
/* XXX woah, shouldn't we be altering more here? size? */
static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
{
- struct lov_async_page *lap = lap_from_cookie(data);
- if (IS_ERR(lap))
- return;
+ struct lov_async_page *lap = LAP_FROM_COOKIE(data);
/* in a raid1 regime this would down a count of many ios
* in flight, onl calling the caller_ops completion when all
int rc;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
LASSERT(loi == NULL);
OBD_ALLOC(lap, sizeof(*lap));
LASSERT(loi == NULL);
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
- lap = lap_from_cookie(cookie);
- if (IS_ERR(lap))
- RETURN(PTR_ERR(lap));
+ lap = LAP_FROM_COOKIE(cookie);
loi = &lsm->lsm_oinfo[lap->lap_stripe];
rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
LASSERT(loi == NULL);
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
- lap = lap_from_cookie(cookie);
- if (IS_ERR(lap))
- RETURN(PTR_ERR(lap));
+ lap = LAP_FROM_COOKIE(cookie);
loi = &lsm->lsm_oinfo[lap->lap_stripe];
rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
LASSERT(loi == NULL);
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
- lap = lap_from_cookie(cookie);
- if (IS_ERR(lap))
- RETURN(PTR_ERR(lap));
+ lap = LAP_FROM_COOKIE(cookie);
loi = &lsm->lsm_oinfo[lap->lap_stripe];
rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
LASSERT(loi == NULL);
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
i++, loi++) {
LASSERT(loi == NULL);
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
- lap = lap_from_cookie(cookie);
- if (IS_ERR(lap))
- RETURN(PTR_ERR(lap));
+ lap = LAP_FROM_COOKIE(cookie);
loi = &lsm->lsm_oinfo[lap->lap_stripe];
rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
void *data,__u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh)
{
- struct lov_lock_handles *lov_lockh = NULL;
+ struct lov_request_set *set;
+ struct lov_request *req;
+ struct list_head *pos;
struct lustre_handle *lov_lockhp;
struct lov_obd *lov;
- struct lov_oinfo *loi;
- char submd_buf[sizeof(struct lov_stripe_md) + sizeof(struct lov_oinfo)];
- struct lov_stripe_md *submd = (void *)submd_buf;
ldlm_error_t rc;
- int i, save_flags = *flags, all_skipped = 1;
+ int save_flags = *flags;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
/* we should never be asked to replay a lock this way. */
LASSERT((*flags & LDLM_FL_REPLAY) == 0);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
- if (lsm->lsm_stripe_count > 1) {
- lov_lockh = lov_llh_new(lsm);
- if (lov_lockh == NULL)
- RETURN(-ENOMEM);
-
- lockh->cookie = lov_lockh->llh_handle.h_cookie;
- lov_lockhp = lov_lockh->llh_handles;
- } else {
- lov_lockhp = lockh;
- }
-
lov = &exp->exp_obd->u.lov;
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++, loi++, lov_lockhp++) {
- ldlm_policy_data_t sub_ext;
-
- if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
- policy->l_extent.end,
- &sub_ext.l_extent.start,
- &sub_ext.l_extent.end))
- continue;
+ rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
+ if (rc)
+ RETURN(rc);
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
+ list_for_each (pos, &set->set_list) {
+ ldlm_policy_data_t sub_policy;
+ req = list_entry(pos, struct lov_request, rq_link);
+ lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
+ LASSERT(lov_lockhp);
- all_skipped = 0;
-
- /* XXX LOV STACKING: submd should be from the subobj */
- submd->lsm_object_id = loi->loi_id;
- submd->lsm_stripe_count = 0;
- submd->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
- submd->lsm_oinfo->loi_rss = loi->loi_rss;
- submd->lsm_oinfo->loi_kms = loi->loi_kms;
- submd->lsm_oinfo->loi_blocks = loi->loi_blocks;
- submd->lsm_oinfo->loi_mtime = loi->loi_mtime;
- /* XXX submd is not fully initialized here */
*flags = save_flags;
- rc = obd_enqueue(lov->tgts[loi->loi_ost_idx].ltd_exp, submd,
- type, &sub_ext, mode, flags, bl_cb, cp_cb,
- gl_cb, data, lvb_len, lvb_swabber, lov_lockhp);
-
- /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
- * It belongs in the OSC, except that the OSC doesn't have
- * access to the real LOI -- it gets a copy, that we created
- * above, and that copy can be arbitrarily out of date.
- *
- * The LOV API is due for a serious rewriting anyways, and this
- * can be addressed then. */
- if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
- __u64 tmp = submd->lsm_oinfo->loi_rss;
-
- LASSERT(lock != NULL);
- loi->loi_rss = tmp;
- loi->loi_mtime = submd->lsm_oinfo->loi_mtime;
- loi->loi_blocks = submd->lsm_oinfo->loi_blocks;
- /* Extend KMS up to the end of this lock and no further
- * A lock on [x,y] means a KMS of up to y + 1 bytes! */
- if (tmp > lock->l_policy_data.l_extent.end)
- tmp = lock->l_policy_data.l_extent.end + 1;
- if (tmp >= loi->loi_kms) {
- LDLM_DEBUG(lock, "acquired set stripe %d rss="
- LPU64", kms="LPU64"\n", i,
- loi->loi_rss, tmp);
- loi->loi_kms = tmp;
- loi->loi_kms_valid = 1;
- } else {
- LDLM_DEBUG(lock, "acquired, set stripe %d rss="
- LPU64"; leaving kms="LPU64", end="
- LPU64"\n", i, loi->loi_rss,
- loi->loi_kms,
- lock->l_policy_data.l_extent.end);
- }
- ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
- } else if (rc == ELDLM_LOCK_ABORTED &&
- save_flags & LDLM_FL_HAS_INTENT) {
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- loi->loi_rss = submd->lsm_oinfo->loi_rss;
- loi->loi_mtime = submd->lsm_oinfo->loi_mtime;
- loi->loi_blocks = submd->lsm_oinfo->loi_blocks;
- CDEBUG(D_DLMTRACE, "glimpsed, set stripe %d rss="LPU64
- "; leaving kms="LPU64"\n", i, loi->loi_rss,
- loi->loi_kms);
- } else {
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- if (lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: enqueue objid "LPX64" subobj "
- LPX64" stripe %d idx %d: rc = %d\n",
- lsm->lsm_object_id, loi->loi_id,
- i, loi->loi_ost_idx, rc);
- GOTO(out_locks, rc);
- }
- }
- }
- if (all_skipped)
- GOTO(out_lockh, rc = -EIO);
-
- if (lsm->lsm_stripe_count > 1)
- lov_llh_put(lov_lockh);
- RETURN(ELDLM_OK);
-
- out_locks:
- while (loi--, lov_lockhp--, i-- > 0) {
- struct lov_stripe_md submd;
- int err;
-
- if (lov_lockhp->cookie == 0)
- continue;
-
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
- err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
- mode, lov_lockhp);
- if (err && lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: cancelling objid "LPX64" on OST "
- "idx %d after enqueue error: rc = %d\n",
- loi->loi_id, loi->loi_ost_idx, err);
- }
+ sub_policy.l_extent.start = req->rq_extent.start;
+ sub_policy.l_extent.end = req->rq_extent.end;
+
+ rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
+ type, &sub_policy, mode, flags, bl_cb,
+ cp_cb, gl_cb, data, lvb_len, lvb_swabber,
+ lov_lockhp);
+ rc = lov_update_enqueue_set(set, req, rc, save_flags);
+ if (rc != ELDLM_OK)
+ break;
}
-out_lockh:
- if (lsm->lsm_stripe_count > 1) {
- lov_llh_destroy(lov_lockh);
- lov_llh_put(lov_lockh);
- }
- return rc;
+ lov_fini_enqueue_set(set, mode);
+ RETURN(rc);
}
static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
__u32 type, ldlm_policy_data_t *policy, __u32 mode,
int *flags, void *data, struct lustre_handle *lockh)
{
- struct lov_lock_handles *lov_lockh = NULL;
+ struct lov_request_set *set;
+ struct lov_request *req;
+ struct list_head *pos;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
struct lustre_handle *lov_lockhp;
- struct lov_obd *lov;
- struct lov_oinfo *loi;
- struct lov_stripe_md submd;
- ldlm_error_t rc = 0;
- int i;
+ int lov_flags, rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
- if (lsm->lsm_stripe_count > 1) {
- lov_lockh = lov_llh_new(lsm);
- if (lov_lockh == NULL)
- RETURN(-ENOMEM);
-
- lockh->cookie = lov_lockh->llh_handle.h_cookie;
- lov_lockhp = lov_lockh->llh_handles;
- } else {
- lov_lockhp = lockh;
- }
-
lov = &exp->exp_obd->u.lov;
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++, loi++, lov_lockhp++) {
- ldlm_policy_data_t sub_ext;
- int lov_flags;
-
- if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
- policy->l_extent.end,
- &sub_ext.l_extent.start,
- &sub_ext.l_extent.end))
- continue;
+ rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
+ if (rc)
+ RETURN(rc);
- if (lov->tgts[loi->loi_ost_idx].active == 0) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- rc = -EIO;
- break;
- }
+ list_for_each (pos, &set->set_list) {
+ ldlm_policy_data_t sub_policy;
+ req = list_entry(pos, struct lov_request, rq_link);
+ lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
+ LASSERT(lov_lockhp);
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
+ sub_policy.l_extent.start = req->rq_extent.start;
+ sub_policy.l_extent.end = req->rq_extent.end;
lov_flags = *flags;
- /* XXX submd is not fully initialized here */
- rc = obd_match(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
- type, &sub_ext, mode, &lov_flags, data,
+
+ rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
+ type, &sub_policy, mode, &lov_flags, data,
lov_lockhp);
+ rc = lov_update_match_set(set, req, rc);
if (rc != 1)
break;
}
- if (rc == 1) {
- if (lsm->lsm_stripe_count > 1) {
- if (*flags & LDLM_FL_TEST_LOCK)
- lov_llh_destroy(lov_lockh);
- lov_llh_put(lov_lockh);
- }
- RETURN(1);
- }
-
- while (loi--, lov_lockhp--, i-- > 0) {
- struct lov_stripe_md submd;
- int err;
-
- if (lov_lockhp->cookie == 0)
- continue;
-
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
- err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
- mode, lov_lockhp);
- if (err && lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: cancelling objid "LPX64" on OST "
- "idx %d after match failure: rc = %d\n",
- loi->loi_id, loi->loi_ost_idx, err);
- }
- }
-
- if (lsm->lsm_stripe_count > 1) {
- lov_llh_destroy(lov_lockh);
- lov_llh_put(lov_lockh);
- }
+ lov_fini_match_set(set, mode, *flags);
RETURN(rc);
}
int rc = 0, i;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
__u32 mode, struct lustre_handle *lockh)
{
- struct lov_lock_handles *lov_lockh = NULL;
+ struct lov_request_set *set;
+ struct lov_request *req;
+ struct list_head *pos;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
struct lustre_handle *lov_lockhp;
- struct lov_obd *lov;
- struct lov_oinfo *loi;
- int rc = 0, i;
+ int err = 0, rc = 0;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
LASSERT(lockh);
- if (lsm->lsm_stripe_count > 1) {
- lov_lockh = lov_handle2llh(lockh);
- if (!lov_lockh) {
- CERROR("LOV: invalid lov lock handle %p\n", lockh);
- RETURN(-EINVAL);
- }
-
- lov_lockhp = lov_lockh->llh_handles;
- } else {
- lov_lockhp = lockh;
- }
-
lov = &exp->exp_obd->u.lov;
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++, loi++, lov_lockhp++) {
- struct lov_stripe_md submd;
- int err;
+ rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
+ if (rc)
+ RETURN(rc);
- if (lov_lockhp->cookie == 0) {
- CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
- loi->loi_ost_idx, loi->loi_id);
- continue;
- }
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+ lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
- /* XXX LOV STACKING: submd should be from the subobj */
- submd.lsm_object_id = loi->loi_id;
- submd.lsm_stripe_count = 0;
- err = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
- mode, lov_lockhp);
- if (err) {
- if (lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: cancel objid "LPX64" subobj "
- LPX64" on OST idx %d: rc = %d\n",
- lsm->lsm_object_id,
- loi->loi_id, loi->loi_ost_idx, err);
- if (!rc)
- rc = err;
- }
+ rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
+ mode, lov_lockhp);
+ rc = lov_update_common_set(set, req, rc);
+ if (rc) {
+ CERROR("error: cancel objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ lsm->lsm_object_id,
+ req->rq_md->lsm_object_id, req->rq_idx, rc);
+ err = rc;
}
+
}
-
- if (lsm->lsm_stripe_count > 1)
- lov_llh_destroy(lov_lockh);
- if (lov_lockh != NULL)
- lov_llh_put(lov_lockh);
- RETURN(rc);
+ lov_fini_cancel_set(set);
+ RETURN(err);
}
static int lov_cancel_unused(struct obd_export *exp,
int rc = 0, i;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
if (!lov->tgts[i].active)
continue;
- rc = obd_get_info(lov->tgts[i].ltd_exp, keylen, key,
- &size, &(ids[i]));
+ rc = obd_get_info(lov->tgts[i].ltd_exp,
+ keylen, key, &size, &(ids[i]));
if (rc != 0)
RETURN(rc);
}
{
struct obd_device *obddev = class_exp2obd(exp);
struct lov_obd *lov = &obddev->u.lov;
- int i, rc = 0;
+ int i, rc = 0, err;
ENTRY;
#define KEY_IS(str) \
if (vallen != lov->desc.ld_tgt_count)
RETURN(-EINVAL);
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- int er;
-
/* initialize all OSCs, even inactive ones */
- er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key,
- sizeof(obd_id), ((obd_id*)val) + i);
+ err = obd_set_info(lov->tgts[i].ltd_exp,
+ keylen, key, sizeof(obd_id),
+ ((obd_id*)val) + i);
if (!rc)
- rc = er;
+ rc = err;
}
RETURN(rc);
}
- if (KEY_IS("growth_count")) {
- if (vallen != sizeof(int))
- RETURN(-EINVAL);
- } else if (KEY_IS("mds_conn") || KEY_IS("unlinked")) {
+ if (KEY_IS("mds_conn") || KEY_IS("unlinked")) {
if (vallen != 0)
RETURN(-EINVAL);
} else {
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- int er;
-
if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
continue;
if (!val && !lov->tgts[i].active)
continue;
- er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, vallen,
- val);
+ err = obd_set_info(lov->tgts[i].ltd_exp,
+ keylen, key, vallen, val);
if (!rc)
- rc = er;
+ rc = err;
}
RETURN(rc);
#undef KEY_IS
-
-}
-
-/* Merge rss if @kms_only == 0
- *
- * Even when merging RSS, we will take the KMS value if it's larger.
- * This prevents getattr from stomping on dirty cached pages which
- * extend the file size. */
-__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms_only)
-{
- struct lov_oinfo *loi;
- __u64 size = 0;
- int i;
-
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++, loi++) {
- obd_size lov_size, tmpsize;
-
- tmpsize = loi->loi_kms;
- if (kms_only == 0 && loi->loi_rss > tmpsize)
- tmpsize = loi->loi_rss;
-
- lov_size = lov_stripe_size(lsm, tmpsize, i);
- if (lov_size > size)
- size = lov_size;
- }
-
- return size;
-}
-EXPORT_SYMBOL(lov_merge_size);
-
-/* Merge blocks */
-__u64 lov_merge_blocks(struct lov_stripe_md *lsm)
-{
- struct lov_oinfo *loi;
- __u64 blocks = 0;
- int i;
-
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++){
- blocks += loi->loi_blocks;
- }
- return blocks;
-}
-EXPORT_SYMBOL(lov_merge_blocks);
-
-__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time)
-{
- struct lov_oinfo *loi;
- int i;
-
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++){
- if (loi->loi_mtime > current_time)
- current_time = loi->loi_mtime;
- }
- return current_time;
}
-EXPORT_SYMBOL(lov_merge_mtime);
#if 0
struct lov_multi_wait {
int rc = 0, i;
ENTRY;
- if (lsm_bad_magic(lsm))
- RETURN(-EINVAL);
+ ASSERT_LSM_MAGIC(lsm);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
#endif
int lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
- obd_off size)
-{
- struct lov_oinfo *loi;
- int stripe = 0;
- __u64 kms;
- ENTRY;
-
- if (size > 0)
- stripe = lov_stripe_number(lsm, size - 1);
- kms = lov_size_to_stripe(lsm, size, stripe);
- loi = &(lsm->lsm_oinfo[stripe]);
-
- CDEBUG(D_INODE, "stripe %d KMS %sincreasing "LPU64"->"LPU64"\n",
- stripe, kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms);
- if (kms > loi->loi_kms)
- loi->loi_kms = kms;
- RETURN(0);
-}
-EXPORT_SYMBOL(lov_increase_kms);
+ obd_off size);
struct obd_ops lov_obd_ops = {
.o_owner = THIS_MODULE,
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LOV
+
+#ifdef __KERNEL__
+#include <asm/div64.h>
+#else
+#include <liblustre.h>
+#endif
+
+#include <linux/obd_class.h>
+#include <linux/obd_lov.h>
+#include <linux/lustre_idl.h>
+
+#include "lov_internal.h"
+
+static void lov_init_set(struct lov_request_set *set)
+{
+ set->set_count = 0;
+ set->set_completes = 0;
+ set->set_success = 0;
+ INIT_LIST_HEAD(&set->set_list);
+ atomic_set(&set->set_refcount, 1);
+}
+
+static void lov_finish_set(struct lov_request_set *set)
+{
+ struct list_head *pos, *n;
+ ENTRY;
+
+ LASSERT(set);
+ list_for_each_safe(pos, n, &set->set_list) {
+ struct lov_request *req = list_entry(pos, struct lov_request,
+ rq_link);
+ list_del_init(&req->rq_link);
+
+ if (req->rq_oa)
+ obdo_free(req->rq_oa);
+ if (req->rq_md)
+ OBD_FREE(req->rq_md, req->rq_buflen);
+ OBD_FREE(req, sizeof(*req));
+ }
+
+ if (set->set_pga) {
+ int len = set->set_oabufs * sizeof(*set->set_pga);
+ OBD_FREE(set->set_pga, len);
+ }
+ if (set->set_lockh)
+ lov_llh_put(set->set_lockh);
+
+ OBD_FREE(set, sizeof(*set));
+ EXIT;
+}
+
+static void lov_update_set(struct lov_request_set *set,
+ struct lov_request *req, int rc)
+{
+ req->rq_complete = 1;
+ req->rq_rc = rc;
+
+ set->set_completes++;
+ if (rc == 0)
+ set->set_success++;
+}
+
+int lov_update_common_set(struct lov_request_set *set,
+ struct lov_request *req, int rc)
+{
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ ENTRY;
+
+ lov_update_set(set, req, rc);
+
+ /* grace error on inactive ost */
+ if (rc && !lov->tgts[req->rq_idx].active)
+ rc = 0;
+
+ /* FIXME in raid1 regime, should return 0 */
+ RETURN(rc);
+}
+
+void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
+{
+ list_add_tail(&req->rq_link, &set->set_list);
+ set->set_count++;
+}
+
+int lov_update_enqueue_set(struct lov_request_set *set,
+ struct lov_request *req, int rc, int flags)
+{
+ struct lustre_handle *lov_lockhp;
+ struct lov_oinfo *loi;
+ ENTRY;
+
+ lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
+ loi = &set->set_md->lsm_oinfo[req->rq_stripe];
+
+ /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
+ * It belongs in the OSC, except that the OSC doesn't have
+ * access to the real LOI -- it gets a copy, that we created
+ * above, and that copy can be arbitrarily out of date.
+ *
+ * The LOV API is due for a serious rewriting anyways, and this
+ * can be addressed then. */
+ if (rc == ELDLM_OK) {
+ struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+ __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
+
+ LASSERT(lock != NULL);
+ loi->loi_rss = tmp;
+ loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
+ /* Extend KMS up to the end of this lock and no further
+ * A lock on [x,y] means a KMS of up to y + 1 bytes! */
+ if (tmp > lock->l_policy_data.l_extent.end)
+ tmp = lock->l_policy_data.l_extent.end + 1;
+ if (tmp >= loi->loi_kms) {
+ CDEBUG(D_INODE, "lock acquired, setting rss="
+ LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
+ loi->loi_kms = tmp;
+ loi->loi_kms_valid = 1;
+ } else {
+ CDEBUG(D_INODE, "lock acquired, setting rss="
+ LPU64"; leaving kms="LPU64", end="LPU64
+ "\n", loi->loi_rss, loi->loi_kms,
+ lock->l_policy_data.l_extent.end);
+ }
+ ldlm_lock_allow_match(lock);
+ LDLM_LOCK_PUT(lock);
+ } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
+ loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
+ CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
+ " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
+ rc = ELDLM_OK;
+ } else {
+ struct obd_export *exp = set->set_exp;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ if (lov->tgts[req->rq_idx].active) {
+ CERROR("error: enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ set->set_md->lsm_object_id, loi->loi_id,
+ loi->loi_ost_idx, rc);
+ } else {
+ rc = ELDLM_OK;
+ }
+ }
+ lov_update_set(set, req, rc);
+ RETURN(rc);
+}
+
+static int enqueue_done(struct lov_request_set *set, __u32 mode)
+{
+ struct list_head *pos;
+ struct lov_request *req;
+ struct lustre_handle *lov_lockhp = NULL;
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_completes);
+ /* enqueue/match success, just return */
+ if (set->set_completes == set->set_success)
+ RETURN(0);
+
+ /* cancel enqueued/matched locks */
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ if (!req->rq_complete || req->rq_rc)
+ continue;
+
+ lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
+ LASSERT(lov_lockhp);
+ if (lov_lockhp->cookie == 0)
+ continue;
+
+ rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
+ mode, lov_lockhp);
+ if (rc && lov->tgts[req->rq_idx].active)
+ CERROR("cancelling obdjid "LPX64" on OST "
+ "idx %d error: rc = %d\n",
+ req->rq_md->lsm_object_id, req->rq_idx, rc);
+ }
+ lov_llh_put(set->set_lockh);
+ RETURN(rc);
+}
+
+int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes)
+ rc = enqueue_done(set, mode);
+ else
+ lov_llh_put(set->set_lockh);
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
+ ldlm_policy_data_t *policy, __u32 mode,
+ struct lustre_handle *lockh,
+ struct lov_request_set **reqset)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ struct lov_request_set *set;
+ int i, rc = 0;
+ struct lov_oinfo *loi;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_lockh = lov_llh_new(lsm);
+ if (set->set_lockh == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ lockh->cookie = set->set_lockh->llh_handle.h_cookie;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+ obd_off start, end;
+
+ if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
+ policy->l_extent.end, &start, &end))
+ continue;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ continue;
+ }
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_buflen = sizeof(*req->rq_md) +
+ sizeof(struct lov_oinfo);
+ OBD_ALLOC(req->rq_md, req->rq_buflen);
+ if (req->rq_md == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_extent.start = start;
+ req->rq_extent.end = end;
+
+ req->rq_idx = loi->loi_ost_idx;
+ req->rq_stripe = i;
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ req->rq_md->lsm_object_id = loi->loi_id;
+ req->rq_md->lsm_stripe_count = 0;
+ req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
+ req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
+ req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
+ req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
+
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(0);
+out_set:
+ lov_fini_enqueue_set(set, mode);
+ RETURN(rc);
+}
+
+int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
+ int rc)
+{
+ int ret = rc;
+ ENTRY;
+
+ if (rc == 1)
+ ret = 0;
+ lov_update_set(set, req, ret);
+ RETURN(rc);
+}
+
+int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes) {
+ if (set->set_count == set->set_success &&
+ flags & LDLM_FL_TEST_LOCK)
+ lov_llh_put(set->set_lockh);
+ rc = enqueue_done(set, mode);
+ } else {
+ lov_llh_put(set->set_lockh);
+ }
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
+ ldlm_policy_data_t *policy, __u32 mode,
+ struct lustre_handle *lockh,
+ struct lov_request_set **reqset)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ struct lov_request_set *set;
+ int i, rc = 0;
+ struct lov_oinfo *loi;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_lockh = lov_llh_new(lsm);
+ if (set->set_lockh == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ lockh->cookie = set->set_lockh->llh_handle.h_cookie;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+ obd_off start, end;
+
+ if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
+ policy->l_extent.end, &start, &end))
+ continue;
+
+ /* FIXME raid1 should grace this error */
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ GOTO(out_set, rc = -EIO);
+ }
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_buflen = sizeof(*req->rq_md);
+ OBD_ALLOC(req->rq_md, req->rq_buflen);
+ if (req->rq_md == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_extent.start = start;
+ req->rq_extent.end = end;
+
+ req->rq_idx = loi->loi_ost_idx;
+ req->rq_stripe = i;
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ req->rq_md->lsm_object_id = loi->loi_id;
+ req->rq_md->lsm_stripe_count = 0;
+
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(rc);
+out_set:
+ lov_fini_match_set(set, mode, 0);
+ RETURN(rc);
+}
+
+int lov_fini_cancel_set(struct lov_request_set *set)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+
+ if (set->set_lockh)
+ lov_llh_put(set->set_lockh);
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
+ __u32 mode, struct lustre_handle *lockh,
+ struct lov_request_set **reqset)
+{
+ struct lov_request_set *set;
+ int i, rc = 0;
+ struct lov_oinfo *loi;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_lockh = lov_handle2llh(lockh);
+ if (set->set_lockh == NULL) {
+ CERROR("LOV: invalid lov lock handle %p\n", lockh);
+ GOTO(out_set, rc = -EINVAL);
+ }
+ lockh->cookie = set->set_lockh->llh_handle.h_cookie;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+ struct lustre_handle *lov_lockhp;
+
+ lov_lockhp = set->set_lockh->llh_handles + i;
+ if (lov_lockhp->cookie == 0) {
+ CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
+ loi->loi_ost_idx, loi->loi_id);
+ continue;
+ }
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_buflen = sizeof(*req->rq_md);
+ OBD_ALLOC(req->rq_md, req->rq_buflen);
+ if (req->rq_md == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_idx = loi->loi_ost_idx;
+ req->rq_stripe = i;
+
+ /* XXX LOV STACKING: submd should be from the subobj */
+ req->rq_md->lsm_object_id = loi->loi_id;
+ req->rq_md->lsm_stripe_count = 0;
+
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(rc);
+out_set:
+ lov_fini_cancel_set(set);
+ RETURN(rc);
+}
+
+static int create_done(struct obd_export *exp, struct lov_request_set *set,
+ struct lov_stripe_md **ea)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ struct obd_trans_info *oti = set->set_oti;
+ struct obdo *src_oa = set->set_oa;
+ struct list_head *pos;
+ struct lov_request *req;
+ struct obdo *ret_oa = NULL;
+ int attrset = 0, rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_completes);
+
+ if (!set->set_success)
+ GOTO(cleanup, rc = -EIO);
+ if (*ea == NULL && set->set_count != set->set_success) {
+ set->set_count = set->set_success;
+ qos_shrink_lsm(set);
+ }
+
+ ret_oa = obdo_alloc();
+ if (ret_oa == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+ if (!req->rq_complete || req->rq_rc)
+ continue;
+ lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
+ set->set_md, req->rq_stripe, &attrset);
+ }
+ if (src_oa->o_valid & OBD_MD_FLSIZE &&
+ ret_oa->o_size != src_oa->o_size) {
+ CERROR("original size "LPU64" isn't new object size "LPU64"\n",
+ src_oa->o_size, ret_oa->o_size);
+ LBUG();
+ }
+ ret_oa->o_id = src_oa->o_id;
+ memcpy(src_oa, ret_oa, sizeof(*src_oa));
+ obdo_free(ret_oa);
+
+ *ea = set->set_md;
+ GOTO(done, rc = 0);
+
+cleanup:
+ list_for_each (pos, &set->set_list) {
+ struct obd_export *sub_exp;
+ int err = 0;
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ if (!req->rq_complete || req->rq_rc)
+ continue;
+
+ sub_exp = lov->tgts[req->rq_idx].ltd_exp,
+ err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
+ if (err)
+ CERROR("Failed to uncreate objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc = %d\n",
+ set->set_oa->o_id, req->rq_oa->o_id,
+ req->rq_idx, rc);
+ }
+ if (*ea == NULL)
+ obd_free_memmd(exp, &set->set_md);
+done:
+ if (oti && set->set_cookies) {
+ oti->oti_logcookies = set->set_cookies;
+ if (!set->set_cookie_sent) {
+ oti_free_cookies(oti);
+ src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
+ } else {
+ src_oa->o_valid |= OBD_MD_FLCOOKIE;
+ }
+ }
+ RETURN(rc);
+}
+
+int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes) {
+ rc = create_done(set->set_exp, set, ea);
+ /* FIXME update qos data here */
+ }
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_update_create_set(struct lov_request_set *set,
+ struct lov_request *req, int rc)
+{
+ struct obd_trans_info *oti = set->set_oti;
+ struct lov_stripe_md *lsm = set->set_md;
+ struct lov_oinfo *loi;
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ ENTRY;
+
+ req->rq_stripe = set->set_success;
+ loi = &lsm->lsm_oinfo[req->rq_stripe];
+
+ if (rc && lov->tgts[req->rq_idx].active) {
+ CERROR("error creating objid "LPX64" sub-object"
+ " on OST idx %d/%d: rc = %d\n",
+ set->set_oa->o_id, req->rq_idx,
+ lsm->lsm_stripe_count, rc);
+ if (rc > 0) {
+ CERROR("obd_create returned invalid err %d\n", rc);
+ rc = -EIO;
+ }
+ }
+ lov_update_set(set, req, rc);
+ if (rc)
+ RETURN(rc);
+
+ if (oti && oti->oti_objid)
+ oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
+
+ loi->loi_id = req->rq_oa->o_id;
+ loi->loi_ost_idx = req->rq_idx;
+ CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
+ lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+ loi_init(loi);
+
+ if (set->set_cookies)
+ ++oti->oti_logcookies;
+ if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
+ set->set_cookie_sent++;
+
+ RETURN(0);
+}
+
+int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
+ struct obdo *src_oa, struct obd_trans_info *oti,
+ struct lov_request_set **reqset)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ struct lov_request_set *set;
+ int rc = 0, newea = 0;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = *ea;
+ set->set_oa = src_oa;
+ set->set_oti = oti;
+
+ if (set->set_md == NULL) {
+ int stripes, stripe_cnt;
+ stripe_cnt = lov_get_stripecnt(lov, 0);
+
+ /* If the MDS file was truncated up to some size, stripe over
+ * enough OSTs to allow the file to be created at that size. */
+ if (src_oa->o_valid & OBD_MD_FLSIZE) {
+ stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
+ do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
+
+ if (stripes > lov->desc.ld_active_tgt_count)
+ GOTO(out_set, rc = -EFBIG);
+ if (stripes < stripe_cnt)
+ stripes = stripe_cnt;
+ } else {
+ stripes = stripe_cnt;
+ }
+
+ rc = lov_alloc_memmd(&set->set_md, stripes,
+ lov->desc.ld_pattern ?
+ lov->desc.ld_pattern : LOV_PATTERN_RAID0);
+ if (rc < 0)
+ goto out_set;
+ newea = 1;
+ }
+
+ rc = qos_prep_create(lov, set, newea);
+ if (rc)
+ goto out_lsm;
+
+ if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
+ oti_alloc_cookies(oti, set->set_count);
+ if (!oti->oti_logcookies)
+ goto out_lsm;
+ set->set_cookies = oti->oti_logcookies;
+ }
+ *reqset = set;
+ RETURN(rc);
+
+out_lsm:
+ if (*ea == NULL)
+ obd_free_memmd(exp, &set->set_md);
+out_set:
+ lov_fini_create_set(set, ea);
+ RETURN(rc);
+}
+
+static int common_attr_done(struct lov_request_set *set)
+{
+ struct list_head *pos;
+ struct lov_request *req;
+ struct obdo *tmp_oa;
+ int rc = 0, attrset = 0;
+ ENTRY;
+
+ if (set->set_oa == NULL)
+ RETURN(0);
+
+ if (!set->set_success)
+ RETURN(-EIO);
+
+ tmp_oa = obdo_alloc();
+ if (tmp_oa == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ if (!req->rq_complete || req->rq_rc)
+ continue;
+ if (req->rq_oa->o_valid == 0) /* inactive stripe */
+ continue;
+ lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
+ set->set_md, req->rq_stripe, &attrset);
+ }
+ if (!attrset) {
+ CERROR("No stripes had valid attrs\n");
+ rc = -EIO;
+ }
+ tmp_oa->o_id = set->set_oa->o_id;
+ memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
+out:
+ if (tmp_oa)
+ obdo_free(tmp_oa);
+ RETURN(rc);
+
+}
+
+static int brw_done(struct lov_request_set *set)
+{
+ struct lov_stripe_md *lsm = set->set_md;
+ struct lov_oinfo *loi = NULL;
+ struct list_head *pos;
+ struct lov_request *req;
+ ENTRY;
+
+ list_for_each (pos, &set->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ if (!req->rq_complete || req->rq_rc)
+ continue;
+
+ loi = &lsm->lsm_oinfo[req->rq_stripe];
+
+ if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
+ loi->loi_blocks = req->rq_oa->o_blocks;
+ }
+
+ RETURN(0);
+}
+
+int lov_fini_brw_set(struct lov_request_set *set)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes) {
+ rc = brw_done(set);
+ /* FIXME update qos data here */
+ }
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md *lsm, obd_count oa_bufs,
+ struct brw_page *pga, struct obd_trans_info *oti,
+ struct lov_request_set **reqset)
+{
+ struct {
+ obd_count index;
+ obd_count count;
+ obd_count off;
+ } *info = NULL;
+ struct lov_request_set *set;
+ struct lov_oinfo *loi = NULL;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int rc = 0, i, shift;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_oa = src_oa;
+ set->set_oti = oti;
+ set->set_oabufs = oa_bufs;
+ OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
+ if (!set->set_pga)
+ GOTO(out, rc = -ENOMEM);
+
+ OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
+ if (!info)
+ GOTO(out, rc = -ENOMEM);
+
+ /* calculate the page count for each stripe */
+ for (i = 0; i < oa_bufs; i++) {
+ int stripe = lov_stripe_number(lsm, pga[i].off);
+ info[stripe].count++;
+ }
+
+ /* alloc and initialize lov request */
+ loi = lsm->lsm_oinfo;
+ shift = 0;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+
+ if (info[i].count == 0)
+ continue;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ GOTO(out, rc = -EIO);
+ }
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ req->rq_oa = obdo_alloc();
+ if (req->rq_oa == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ if (src_oa)
+ memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
+ req->rq_oa->o_id = loi->loi_id;
+
+ req->rq_buflen = sizeof(*req->rq_md);
+ OBD_ALLOC(req->rq_md, req->rq_buflen);
+ if (req->rq_md == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ req->rq_idx = loi->loi_ost_idx;
+ req->rq_stripe = i;
+
+ /* XXX LOV STACKING */
+ req->rq_md->lsm_object_id = loi->loi_id;
+ req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
+ req->rq_oabufs = info[i].count;
+ req->rq_pgaidx = shift;
+ shift += req->rq_oabufs;
+
+ /* remember the index for sort brw_page array */
+ info[i].index = req->rq_pgaidx;
+
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out, rc = -EIO);
+
+ /* rotate & sort the brw_page array */
+ for (i = 0; i < oa_bufs; i++) {
+ int stripe = lov_stripe_number(lsm, pga[i].off);
+
+ shift = info[stripe].index + info[stripe].off;
+ LASSERT(shift < oa_bufs);
+ set->set_pga[shift] = pga[i];
+ lov_stripe_offset(lsm, pga[i].off, stripe,
+ &set->set_pga[shift].off);
+ info[stripe].off++;
+ }
+out:
+ if (info)
+ OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
+
+ if (rc == 0)
+ *reqset = set;
+ else
+ lov_fini_brw_set(set);
+
+ RETURN(rc);
+}
+
+static int getattr_done(struct lov_request_set *set)
+{
+ return common_attr_done(set);
+}
+
+int lov_fini_getattr_set(struct lov_request_set *set)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes)
+ rc = getattr_done(set);
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md *lsm,
+ struct lov_request_set **reqset)
+{
+ struct lov_request_set *set;
+ struct lov_oinfo *loi = NULL;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int rc = 0, i;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_oa = src_oa;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ continue;
+ }
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_stripe = i;
+ req->rq_idx = loi->loi_ost_idx;
+
+ req->rq_oa = obdo_alloc();
+ if (req->rq_oa == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
+ req->rq_oa->o_id = loi->loi_id;
+
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(rc);
+out_set:
+ lov_fini_getattr_set(set);
+ RETURN(rc);
+}
+
+int lov_fini_destroy_set(struct lov_request_set *set)
+{
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes) {
+ /* FIXME update qos data here */
+ }
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(0);
+}
+
+int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md *lsm,
+ struct obd_trans_info *oti,
+ struct lov_request_set **reqset)
+{
+ struct lov_request_set *set;
+ struct lov_oinfo *loi = NULL;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int rc = 0, cookie_set = 0, i;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_oa = src_oa;
+ set->set_oti = oti;
+ if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
+ set->set_cookies = oti->oti_logcookies;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ continue;
+ }
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+
+ req->rq_stripe = i;
+ req->rq_idx = loi->loi_ost_idx;
+
+ req->rq_oa = obdo_alloc();
+ if (req->rq_oa == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
+ req->rq_oa->o_id = loi->loi_id;
+
+ /* Setup the first request's cookie position */
+ if (!cookie_set && set->set_cookies) {
+ oti->oti_logcookies = set->set_cookies + i;
+ cookie_set = 1;
+ }
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(rc);
+out_set:
+ lov_fini_destroy_set(set);
+ RETURN(rc);
+}
+
+static int setattr_done(struct lov_request_set *set)
+{
+ return common_attr_done(set);
+}
+
+int lov_fini_setattr_set(struct lov_request_set *set)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes) {
+ rc = setattr_done(set);
+ /* FIXME update qos data here */
+ }
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+ RETURN(rc);
+}
+
+int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md *lsm, struct obd_trans_info *oti,
+ struct lov_request_set **reqset)
+{
+ struct lov_request_set *set;
+ struct lov_oinfo *loi = NULL;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int rc = 0, i;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_oa = src_oa;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ continue;
+ }
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ req->rq_stripe = i;
+ req->rq_idx = loi->loi_ost_idx;
+
+ req->rq_oa = obdo_alloc();
+ if (req->rq_oa == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
+ req->rq_oa->o_id = loi->loi_id;
+
+ if (src_oa->o_valid & OBD_MD_FLSIZE) {
+ if (lov_stripe_offset(lsm, src_oa->o_size, i,
+ &req->rq_oa->o_size) < 0 &&
+ req->rq_oa->o_size)
+ req->rq_oa->o_size--;
+ CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
+ i, req->rq_oa->o_size, src_oa->o_size);
+ }
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(rc);
+out_set:
+ lov_fini_setattr_set(set);
+ RETURN(rc);
+}
+
+int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
+ int rc)
+{
+ struct lov_stripe_md *lsm = set->set_md;
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ ENTRY;
+
+ lov_update_set(set, req, rc);
+ if (rc == 0) {
+ struct lov_oinfo *loi = &lsm->lsm_oinfo[req->rq_stripe];
+ loi->loi_kms = loi->loi_rss = req->rq_extent.start;
+ }
+ if (rc && !lov->tgts[req->rq_idx].active)
+ rc = 0;
+ /* FIXME in raid1 regime, should return 0 */
+ RETURN(rc);
+}
+
+int lov_fini_punch_set(struct lov_request_set *set)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes) {
+ if (!set->set_success)
+ rc = -EIO;
+ /* FIXME update qos data here */
+ }
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md *lsm, obd_off start,
+ obd_off end, struct obd_trans_info *oti,
+ struct lov_request_set **reqset)
+{
+ struct lov_request_set *set;
+ struct lov_oinfo *loi = NULL;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int rc = 0, i;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_oa = src_oa;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+ obd_off rs, re;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ continue;
+ }
+
+ if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+ continue;
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ req->rq_stripe = i;
+ req->rq_idx = loi->loi_ost_idx;
+
+ req->rq_oa = obdo_alloc();
+ if (req->rq_oa == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
+ req->rq_oa->o_id = loi->loi_id;
+
+ req->rq_extent.start = rs;
+ req->rq_extent.end = re;
+
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(rc);
+out_set:
+ lov_fini_punch_set(set);
+ RETURN(rc);
+}
+
+int lov_fini_sync_set(struct lov_request_set *set)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(set->set_exp);
+ if (set == NULL)
+ RETURN(0);
+ if (set->set_completes) {
+ if (!set->set_success)
+ rc = -EIO;
+ /* FIXME update qos data here */
+ }
+
+ if (atomic_dec_and_test(&set->set_refcount))
+ lov_finish_set(set);
+
+ RETURN(rc);
+}
+
+int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md *lsm, obd_off start,
+ obd_off end, struct lov_request_set **reqset)
+{
+ struct lov_request_set *set;
+ struct lov_oinfo *loi = NULL;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ int rc = 0, i;
+ ENTRY;
+
+ OBD_ALLOC(set, sizeof(*set));
+ if (set == NULL)
+ RETURN(-ENOMEM);
+ lov_init_set(set);
+
+ set->set_exp = exp;
+ set->set_md = lsm;
+ set->set_oa = src_oa;
+
+ loi = lsm->lsm_oinfo;
+ for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+ struct lov_request *req;
+ obd_off rs, re;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ continue;
+ }
+
+ if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+ continue;
+
+ OBD_ALLOC(req, sizeof(*req));
+ if (req == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ req->rq_stripe = i;
+ req->rq_idx = loi->loi_ost_idx;
+
+ req->rq_oa = obdo_alloc();
+ if (req->rq_oa == NULL)
+ GOTO(out_set, rc = -ENOMEM);
+ memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
+ req->rq_oa->o_id = loi->loi_id;
+
+ req->rq_extent.start = rs;
+ req->rq_extent.end = re;
+
+ lov_set_add_req(req, set);
+ }
+ if (!set->set_count)
+ GOTO(out_set, rc = -EIO);
+ *reqset = set;
+ RETURN(rc);
+out_set:
+ lov_fini_sync_set(set);
+ RETURN(rc);
+}