* in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see [sun.com URL with a
- * copy of GPLv2].
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
set->set_cookies = 0;
CFS_INIT_LIST_HEAD(&set->set_list);
atomic_set(&set->set_refcount, 1);
+ cfs_waitq_init(&set->set_waitq);
}
static void lov_finish_set(struct lov_request_set *set)
EXIT;
}
+int lov_finished_set(struct lov_request_set *set)
+{
+ CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+ set->set_count);
+ return set->set_completes == set->set_count;
+}
+
+
void lov_update_set(struct lov_request_set *set,
struct lov_request *req, int rc)
{
set->set_completes++;
if (rc == 0)
set->set_success++;
+
+ cfs_waitq_signal(&set->set_waitq);
}
int lov_update_common_set(struct lov_request_set *set,
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
{
list_add_tail(&req->rq_link, &set->set_list);
set->set_count++;
+ req->rq_rqset = set;
+}
+
+extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags,
+ struct ost_lvb *lvb, __u32 mode, int rc);
+
+static int lov_update_enqueue_lov(struct obd_export *exp,
+ struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags, int idx,
+ __u64 oid, int rc)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+
+ if (rc != ELDLM_OK &&
+ !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
+ /* -EUSERS used by OST to report file contention */
+ if (rc != -EINTR && rc != -EUSERS)
+ CERROR("enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc %d\n",
+ oid, loi->loi_id, loi->loi_ost_idx, rc);
+ } else
+ rc = ELDLM_OK;
+ }
+ return rc;
}
int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
{
struct lov_request_set *set = req->rq_rqset;
struct lustre_handle *lov_lockhp;
+ struct obd_info *oi = set->set_oi;
struct lov_oinfo *loi;
ENTRY;
- LASSERT(set != NULL);
- LASSERT(set->set_oi != NULL);
+ LASSERT(oi != NULL);
lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
- loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
+ loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
/* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
* and that copy can be arbitrarily out of date.
* The LOV API is due for a serious rewriting anyways, and this
* can be addressed then. */
- if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
- __u64 tmp;
-
- LASSERT(lock != NULL);
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- tmp = loi->loi_lvb.lvb_size;
- /* Extend KMS up to the end of this lock and no further
- * A lock on [x,y] means a KMS of up to y + 1 bytes! */
- if (tmp > lock->l_policy_data.l_extent.end)
- tmp = lock->l_policy_data.l_extent.end + 1;
- if (tmp >= loi->loi_kms) {
- LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
- ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
- loi->loi_kms = tmp;
- loi->loi_kms_valid = 1;
- } else {
- LDLM_DEBUG(lock, "lock acquired, setting rss="
- LPU64"; leaving kms="LPU64", end="LPU64,
- loi->loi_lvb.lvb_size, loi->loi_kms,
- lock->l_policy_data.l_extent.end);
- }
- lov_stripe_unlock(set->set_oi->oi_md);
- ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
- } else if ((rc == ELDLM_LOCK_ABORTED) &&
- (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- lov_stripe_unlock(set->set_oi->oi_md);
- CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
- " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
- rc = ELDLM_OK;
- } else {
- struct obd_export *exp = set->set_exp;
- struct lov_obd *lov = &exp->exp_obd->u.lov;
-
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- if (lov->lov_tgts[req->rq_idx] &&
- lov->lov_tgts[req->rq_idx]->ltd_active) {
- /* -EUSERS used by OST to report file contention */
- if (rc != -EINTR && rc != -EUSERS)
- CERROR("enqueue objid "LPX64" subobj "
- LPX64" on OST idx %d: rc %d\n",
- set->set_oi->oi_md->lsm_object_id,
- loi->loi_id, loi->loi_ost_idx, rc);
- } else {
- rc = ELDLM_OK;
- }
- }
+ lov_stripe_lock(oi->oi_md);
+ osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
+ &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
+ if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
+ memset(lov_lockhp, 0, sizeof *lov_lockhp);
+ rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
+ req->rq_idx, oi->oi_md->lsm_object_id, rc);
+ lov_stripe_unlock(oi->oi_md);
lov_update_set(set, req, rc);
RETURN(rc);
}
/* The callback for osc_enqueue that updates lov info for every OSC request. */
-static int cb_update_enqueue(struct obd_info *oinfo, int rc)
+static int cb_update_enqueue(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct ldlm_enqueue_info *einfo;
struct lov_request *lovreq;
((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
sizeof(struct lov_oinfo *);
-
- req->rq_rqset = set;
/* Set lov request specific parameters. */
req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
req->rq_oi.oi_cb_up = cb_update_enqueue;
rc = qos_remedy_create(set, req);
lov_update_create_set(set, req, rc);
-
- if (rc)
- break;
}
}
if (set->set_success == 0)
GOTO(cleanup, rc);
- /* If there was an explicit stripe set, fail. Otherwise, we
- * got some objects and that's not bad. */
if (set->set_count != set->set_success) {
- if (*lsmp)
- GOTO(cleanup, rc);
set->set_count = set->set_success;
qos_shrink_lsm(set);
}
continue;
sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
- err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
+ err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
+ NULL);
if (err)
CERROR("Failed to uncreate objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
RETURN(0);
}
+int cb_create_update(void *cookie, int rc)
+{
+ struct obd_info *oinfo = cookie;
+ struct lov_request *lovreq;
+
+ lovreq = container_of(oinfo, struct lov_request, rq_oi);
+ return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+
int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
struct lov_stripe_md **lsmp, struct obdo *src_oa,
struct obd_trans_info *oti,
set->set_oti = oti;
rc = qos_prep_create(exp, set);
+ /* qos_shrink_lsm() may have allocated a new lsm */
+ *lsmp = oinfo->oi_md;
if (rc)
lov_fini_create_set(set, lsmp);
else
if (info[i].count == 0)
continue;
-
+
loi = oinfo->oi_md->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
GOTO(out, rc = -EIO);
}
/* The callback for osc_getattr_async that finilizes a request info when a
- * response is recieved. */
-static int cb_getattr_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_getattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
req->rq_oi.oi_oa->o_id = loi->loi_id;
req->rq_oi.oi_cb_up = cb_getattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
- req->rq_rqset = set;
lov_set_add_req(req, set);
}
struct lov_request *req;
loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
}
/* The callback for osc_setattr_async that finilizes a request info when a
- * response is recieved. */
-static int cb_setattr_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_setattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
- || req->rq_oi.oi_oa->o_gr>0);
+ LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
+ CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
+ "req->rq_oi.oi_oa->o_valid="LPX64" "
+ "req->rq_oi.oi_oa->o_gr="LPU64"\n",
+ req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
- req->rq_rqset = set;
if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
int off = lov_stripe_offset(oinfo->oi_md,
}
/* The callback for osc_punch that finilizes a request info when a response
- * is recieved. */
-static int cb_update_punch(struct obd_info *oinfo, int rc)
+ * is received. */
+static int cb_update_punch(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
struct lov_request *req;
obd_off rs, re;
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
- !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
if (!lov_stripe_intersects(oinfo->oi_md, i,
oinfo->oi_policy.l_extent.start,
oinfo->oi_policy.l_extent.end,
&rs, &re))
continue;
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ GOTO(out_set, rc = -EIO);
+ }
+
OBD_ALLOC(req, sizeof(*req));
if (req == NULL)
GOTO(out_set, rc = -ENOMEM);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_update_punch;
- req->rq_rqset = set;
req->rq_oi.oi_policy.l_extent.start = rs;
req->rq_oi.oi_policy.l_extent.end = re;
spin_lock(&obd->obd_osfs_lock);
memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
- obd->obd_osfs_age = get_jiffies_64();
+ obd->obd_osfs_age = cfs_time_current_64();
spin_unlock(&obd->obd_osfs_lock);
RETURN(0);
}
RETURN(rc);
}
-void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
- struct obd_statfs *lov_sfs, int success)
+void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
+ int success)
{
int shift = 0, quit = 0;
__u64 tmp;
- spin_lock(&obd->obd_osfs_lock);
- memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
- obd->obd_osfs_age = get_jiffies_64();
- spin_unlock(&obd->obd_osfs_lock);
if (success == 0) {
memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
}
/* The callback for osc_statfs_async that finilizes a request info when a
- * response is recieved. */
-static int cb_statfs_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_statfs_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
struct obd_statfs *osfs, *lov_sfs;
struct obd_device *obd;
lov_sfs = oinfo->oi_osfs;
success = lovreq->rq_rqset->set_success;
-
/* XXX: the same is done in lov_update_common_set, however
lovset->set_exp is not initialized. */
lov_update_set(lovreq->rq_rqset, lovreq, rc);
if (rc) {
+ /* XXX ignore error for disconnected ost ? */
if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
lov->lov_tgts[lovreq->rq_idx]->ltd_active))
rc = 0;
- RETURN(rc);
+ GOTO(out, rc);
}
- lov_update_statfs(obd, osfs, lov_sfs, success);
+ spin_lock(&obd->obd_osfs_lock);
+ memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+ if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
+ obd->obd_osfs_age = cfs_time_current_64();
+ spin_unlock(&obd->obd_osfs_lock);
+
+ lov_update_statfs(osfs, lov_sfs, success);
qos_update(lov);
+out:
+ if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+ lov_finished_set(lovreq->rq_rqset)) {
+ lov_statfs_interpret(NULL, lovreq->rq_rqset,
+ lovreq->rq_rqset->set_success !=
+ lovreq->rq_rqset->set_count);
+ qos_statfs_done(lov);
+ }
RETURN(0);
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
struct lov_request *req;
- if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
+ if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
+ && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
CDEBUG(D_HA, "lov idx %d inactive\n", i);
continue;
}
req->rq_idx = i;
req->rq_oi.oi_cb_up = cb_statfs_update;
- req->rq_rqset = set;
+ req->rq_oi.oi_flags = oinfo->oi_flags;
lov_set_add_req(req, set);
}