lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
set->set_count++;
}
+extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags,
+ struct ost_lvb *lvb, __u32 mode, int rc);
+
+static int lov_update_enqueue_lov(struct obd_export *exp,
+ struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags, int idx,
+ __u64 oid, int rc)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+
+ if (rc != ELDLM_OK &&
+ !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
+ /* -EUSERS used by OST to report file contention */
+ if (rc != -EINTR && rc != -EUSERS)
+ CERROR("enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc %d\n",
+ oid, loi->loi_id, loi->loi_ost_idx, rc);
+ } else
+ rc = ELDLM_OK;
+ }
+ return rc;
+}
+
int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
{
struct lov_request_set *set = req->rq_rqset;
struct lustre_handle *lov_lockhp;
+ struct obd_info *oi = set->set_oi;
struct lov_oinfo *loi;
ENTRY;
- LASSERT(set != NULL);
- LASSERT(set->set_oi != NULL);
+ LASSERT(oi != NULL);
lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
- loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
+ loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
/* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
* and that copy can be arbitrarily out of date.
* The LOV API is due for a serious rewriting anyways, and this
* can be addressed then. */
- if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
- __u64 tmp;
-
- LASSERT(lock != NULL);
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- tmp = loi->loi_lvb.lvb_size;
- /* Extend KMS up to the end of this lock and no further
- * A lock on [x,y] means a KMS of up to y + 1 bytes! */
- if (tmp > lock->l_policy_data.l_extent.end)
- tmp = lock->l_policy_data.l_extent.end + 1;
- if (tmp >= loi->loi_kms) {
- LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
- ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
- loi->loi_kms = tmp;
- loi->loi_kms_valid = 1;
- } else {
- LDLM_DEBUG(lock, "lock acquired, setting rss="
- LPU64"; leaving kms="LPU64", end="LPU64,
- loi->loi_lvb.lvb_size, loi->loi_kms,
- lock->l_policy_data.l_extent.end);
- }
- lov_stripe_unlock(set->set_oi->oi_md);
- ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
- } else if ((rc == ELDLM_LOCK_ABORTED) &&
- (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- lov_stripe_unlock(set->set_oi->oi_md);
- CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
- " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
- rc = ELDLM_OK;
- } else {
- struct obd_export *exp = set->set_exp;
- struct lov_obd *lov = &exp->exp_obd->u.lov;
-
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- if (lov->lov_tgts[req->rq_idx] &&
- lov->lov_tgts[req->rq_idx]->ltd_active) {
- /* -EUSERS used by OST to report file contention */
- if (rc != -EINTR && rc != -EUSERS)
- CERROR("enqueue objid "LPX64" subobj "
- LPX64" on OST idx %d: rc %d\n",
- set->set_oi->oi_md->lsm_object_id,
- loi->loi_id, loi->loi_ost_idx, rc);
- } else {
- rc = ELDLM_OK;
- }
- }
+ lov_stripe_lock(oi->oi_md);
+ osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
+ &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
+ if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
+ memset(lov_lockhp, 0, sizeof *lov_lockhp);
+ rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
+ req->rq_idx, oi->oi_md->lsm_object_id, rc);
+ lov_stripe_unlock(oi->oi_md);
lov_update_set(set, req, rc);
RETURN(rc);
}
/* The callback for osc_enqueue that updates lov info for every OSC request. */
-static int cb_update_enqueue(struct obd_info *oinfo, int rc)
+static int cb_update_enqueue(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct ldlm_enqueue_info *einfo;
struct lov_request *lovreq;
if (info[i].count == 0)
continue;
-
+
loi = oinfo->oi_md->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
GOTO(out, rc = -EIO);
/* The callback for osc_getattr_async that finilizes a request info when a
* response is recieved. */
-static int cb_getattr_update(struct obd_info *oinfo, int rc)
+static int cb_getattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
struct lov_request *req;
loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
/* The callback for osc_setattr_async that finilizes a request info when a
* response is recieved. */
-static int cb_setattr_update(struct obd_info *oinfo, int rc)
+static int cb_setattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
+ LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
|| req->rq_oi.oi_oa->o_gr>0);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
/* The callback for osc_punch that finilizes a request info when a response
* is recieved. */
-static int cb_update_punch(struct obd_info *oinfo, int rc)
+static int cb_update_punch(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
/* The callback for osc_statfs_async that finilizes a request info when a
* response is recieved. */
-static int cb_statfs_update(struct obd_info *oinfo, int rc)
+static int cb_statfs_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
struct obd_statfs *osfs, *lov_sfs;
struct obd_device *obd;