/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
+ * GPL HEADER START
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
#ifndef EXPORT_SYMTAB
set->set_success = 0;
set->set_cookies = 0;
CFS_INIT_LIST_HEAD(&set->set_list);
- atomic_set(&set->set_refcount, 1);
+ cfs_atomic_set(&set->set_refcount, 1);
+ cfs_waitq_init(&set->set_waitq);
+ cfs_spin_lock_init(&set->set_lock);
}
-static void lov_finish_set(struct lov_request_set *set)
+void lov_finish_set(struct lov_request_set *set)
{
- struct list_head *pos, *n;
+ cfs_list_t *pos, *n;
ENTRY;
LASSERT(set);
- list_for_each_safe(pos, n, &set->set_list) {
- struct lov_request *req = list_entry(pos, struct lov_request,
- rq_link);
- list_del_init(&req->rq_link);
+ cfs_list_for_each_safe(pos, n, &set->set_list) {
+ struct lov_request *req = cfs_list_entry(pos,
+ struct lov_request,
+ rq_link);
+ cfs_list_del_init(&req->rq_link);
if (req->rq_oi.oi_oa)
OBDO_FREE(req->rq_oi.oi_oa);
EXIT;
}
+int lov_finished_set(struct lov_request_set *set)
+{
+ CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+ set->set_count);
+ return set->set_completes == set->set_count;
+}
+
+
void lov_update_set(struct lov_request_set *set,
struct lov_request *req, int rc)
{
set->set_completes++;
if (rc == 0)
set->set_success++;
+
+ cfs_waitq_signal(&set->set_waitq);
}
int lov_update_common_set(struct lov_request_set *set,
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
{
- list_add_tail(&req->rq_link, &set->set_list);
+ cfs_list_add_tail(&req->rq_link, &set->set_list);
set->set_count++;
+ req->rq_rqset = set;
+}
+
+extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags,
+ struct ost_lvb *lvb, __u32 mode, int rc);
+
+static int lov_update_enqueue_lov(struct obd_export *exp,
+ struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags, int idx,
+ __u64 oid, int rc)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+
+ if (rc != ELDLM_OK &&
+ !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
+ /* -EUSERS used by OST to report file contention */
+ if (rc != -EINTR && rc != -EUSERS)
+ CERROR("enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc %d\n",
+ oid, loi->loi_id, loi->loi_ost_idx, rc);
+ } else
+ rc = ELDLM_OK;
+ }
+ return rc;
}
int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
{
struct lov_request_set *set = req->rq_rqset;
struct lustre_handle *lov_lockhp;
+ struct obd_info *oi = set->set_oi;
struct lov_oinfo *loi;
ENTRY;
- LASSERT(set != NULL);
- LASSERT(set->set_oi != NULL);
+ LASSERT(oi != NULL);
lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
- loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
+ loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
/* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
* and that copy can be arbitrarily out of date.
* The LOV API is due for a serious rewriting anyways, and this
* can be addressed then. */
- if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
- __u64 tmp;
-
- LASSERT(lock != NULL);
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- tmp = loi->loi_lvb.lvb_size;
- /* Extend KMS up to the end of this lock and no further
- * A lock on [x,y] means a KMS of up to y + 1 bytes! */
- if (tmp > lock->l_policy_data.l_extent.end)
- tmp = lock->l_policy_data.l_extent.end + 1;
- if (tmp >= loi->loi_kms) {
- LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
- ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
- loi->loi_kms = tmp;
- loi->loi_kms_valid = 1;
- } else {
- LDLM_DEBUG(lock, "lock acquired, setting rss="
- LPU64"; leaving kms="LPU64", end="LPU64,
- loi->loi_lvb.lvb_size, loi->loi_kms,
- lock->l_policy_data.l_extent.end);
- }
- lov_stripe_unlock(set->set_oi->oi_md);
- ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
- } else if ((rc == ELDLM_LOCK_ABORTED) &&
- (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- lov_stripe_unlock(set->set_oi->oi_md);
- CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
- " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
- rc = ELDLM_OK;
- } else {
- struct obd_export *exp = set->set_exp;
- struct lov_obd *lov = &exp->exp_obd->u.lov;
-
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- if (lov->lov_tgts[req->rq_idx] &&
- lov->lov_tgts[req->rq_idx]->ltd_active) {
- /* -EUSERS used by OST to report file contention */
- if (rc != -EINTR && rc != -EUSERS)
- CERROR("enqueue objid "LPX64" subobj "
- LPX64" on OST idx %d: rc %d\n",
- set->set_oi->oi_md->lsm_object_id,
- loi->loi_id, loi->loi_ost_idx, rc);
- } else {
- rc = ELDLM_OK;
- }
- }
+ lov_stripe_lock(oi->oi_md);
+ osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
+ &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
+ if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
+ memset(lov_lockhp, 0, sizeof *lov_lockhp);
+ rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
+ req->rq_idx, oi->oi_md->lsm_object_id, rc);
+ lov_stripe_unlock(oi->oi_md);
lov_update_set(set, req, rc);
RETURN(rc);
}
/* The callback for osc_enqueue that updates lov info for every OSC request. */
-static int cb_update_enqueue(struct obd_info *oinfo, int rc)
+static int cb_update_enqueue(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct ldlm_enqueue_info *einfo;
struct lov_request *lovreq;
RETURN(0);
/* cancel enqueued/matched locks */
- list_for_each_entry(req, &set->set_list, rq_link) {
+ cfs_list_for_each_entry(req, &set->set_list, rq_link) {
struct lustre_handle *lov_lockhp;
if (!req->rq_complete || req->rq_rc)
} else if (set->set_lockh)
lov_llh_put(set->set_lockh);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc ? rc : ret);
}
((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
sizeof(struct lov_oinfo *);
-
- req->rq_rqset = set;
/* Set lov request specific parameters. */
req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
req->rq_oi.oi_cb_up = cb_update_enqueue;
/* XXX LOV STACKING: submd should be from the subobj */
req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
- req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
+ req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
req->rq_oi.oi_md->lsm_stripe_count = 0;
req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
loi->loi_kms_valid;
(flags & LDLM_FL_TEST_LOCK))
lov_llh_put(set->set_lockh);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
/* XXX LOV STACKING: submd should be from the subobj */
req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
- req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
+ req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
req->rq_oi.oi_md->lsm_stripe_count = 0;
lov_set_add_req(req, set);
if (set->set_lockh)
lov_llh_put(set->set_lockh);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
lov_lockhp = set->set_lockh->llh_handles + i;
if (!lustre_handle_is_used(lov_lockhp)) {
- CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
+ CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
loi->loi_ost_idx, loi->loi_id);
continue;
}
/* XXX LOV STACKING: submd should be from the subobj */
req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
- req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
+ req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
req->rq_oi.oi_md->lsm_stripe_count = 0;
lov_set_add_req(req, set);
/* try alloc objects on other osts if osc_create fails for
* exceptions: RPC failure, ENOSPC, etc */
if (set->set_count != set->set_success) {
- list_for_each_entry (req, &set->set_list, rq_link) {
+ cfs_list_for_each_entry (req, &set->set_list, rq_link) {
if (req->rq_rc == 0)
continue;
rc = qos_remedy_create(set, req);
lov_update_create_set(set, req, rc);
-
- if (rc)
- break;
}
}
if (set->set_success == 0)
GOTO(cleanup, rc);
- /* If there was an explicit stripe set, fail. Otherwise, we
- * got some objects and that's not bad. */
if (set->set_count != set->set_success) {
- if (*lsmp)
- GOTO(cleanup, rc);
set->set_count = set->set_success;
qos_shrink_lsm(set);
}
if (ret_oa == NULL)
GOTO(cleanup, rc = -ENOMEM);
- list_for_each_entry(req, &set->set_list, rq_link) {
+ cfs_list_for_each_entry(req, &set->set_list, rq_link) {
if (!req->rq_complete || req->rq_rc)
continue;
lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
LBUG();
}
ret_oa->o_id = src_oa->o_id;
- ret_oa->o_gr = src_oa->o_gr;
+ ret_oa->o_seq = src_oa->o_seq;
ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
memcpy(src_oa, ret_oa, sizeof(*src_oa));
OBDO_FREE(ret_oa);
GOTO(done, rc = 0);
cleanup:
- list_for_each_entry(req, &set->set_list, rq_link) {
+ cfs_list_for_each_entry(req, &set->set_list, rq_link) {
struct obd_export *sub_exp;
int err = 0;
continue;
sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
- err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
+ err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
+ NULL);
if (err)
CERROR("Failed to uncreate objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
if (set->set_completes)
rc = create_done(set->set_exp, set, lsmp);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
-
+ lov_put_reqset(set);
RETURN(rc);
}
struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
ENTRY;
- req->rq_stripe = set->set_success;
- loi = lsm->lsm_oinfo[req->rq_stripe];
-
if (rc && lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active) {
CERROR("error creating fid "LPX64" sub-object"
rc = -EIO;
}
}
- lov_update_set(set, req, rc);
- if (rc)
+
+ cfs_spin_lock(&set->set_lock);
+ req->rq_stripe = set->set_success;
+ loi = lsm->lsm_oinfo[req->rq_stripe];
+ if (rc) {
+ lov_update_set(set, req, rc);
+ cfs_spin_unlock(&set->set_lock);
RETURN(rc);
+ }
loi->loi_id = req->rq_oi.oi_oa->o_id;
- loi->loi_gr = req->rq_oi.oi_oa->o_gr;
+ loi->loi_seq = req->rq_oi.oi_oa->o_seq;
loi->loi_ost_idx = req->rq_idx;
- CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
- lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
loi_init(loi);
if (oti && set->set_cookies)
if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
set->set_cookie_sent++;
- RETURN(0);
+ lov_update_set(set, req, rc);
+ cfs_spin_unlock(&set->set_lock);
+
+ CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+ lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+ RETURN(rc);
}
+int cb_create_update(void *cookie, int rc)
+{
+ struct obd_info *oinfo = cookie;
+ struct lov_request *lovreq;
+
+ lovreq = container_of(oinfo, struct lov_request, rq_oi);
+ rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+ if (lov_finished_set(lovreq->rq_rqset))
+ lov_put_reqset(lovreq->rq_rqset);
+ return rc;
+}
+
+
int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
struct lov_stripe_md **lsmp, struct obdo *src_oa,
struct obd_trans_info *oti,
set->set_oi->oi_md = *lsmp;
set->set_oi->oi_oa = src_oa;
set->set_oti = oti;
+ lov_get_reqset(set);
rc = qos_prep_create(exp, set);
- if (rc)
+ /* qos_shrink_lsm() may have allocated a new lsm */
+ *lsmp = oinfo->oi_md;
+ if (rc) {
lov_fini_create_set(set, lsmp);
- else
+ lov_put_reqset(set);
+ } else {
*reqset = set;
+ }
RETURN(rc);
}
static int common_attr_done(struct lov_request_set *set)
{
- struct list_head *pos;
+ cfs_list_t *pos;
struct lov_request *req;
struct obdo *tmp_oa;
int rc = 0, attrset = 0;
if (tmp_oa == NULL)
GOTO(out, rc = -ENOMEM);
- list_for_each (pos, &set->set_list) {
- req = list_entry(pos, struct lov_request, rq_link);
+ cfs_list_for_each (pos, &set->set_list) {
+ req = cfs_list_entry(pos, struct lov_request, rq_link);
if (!req->rq_complete || req->rq_rc)
continue;
CERROR("No stripes had valid attrs\n");
rc = -EIO;
}
+ if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
+ (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
+ /* When we take attributes of some epoch, we require all the
+ * ost to be active. */
+ CERROR("Not all the stripes had valid attrs\n");
+ GOTO(out, rc = -EIO);
+ }
+
tmp_oa->o_id = set->set_oi->oi_oa->o_id;
memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
out:
{
struct lov_stripe_md *lsm = set->set_oi->oi_md;
struct lov_oinfo *loi = NULL;
- struct list_head *pos;
+ cfs_list_t *pos;
struct lov_request *req;
ENTRY;
- list_for_each (pos, &set->set_list) {
- req = list_entry(pos, struct lov_request, rq_link);
+ cfs_list_for_each (pos, &set->set_list) {
+ req = cfs_list_entry(pos, struct lov_request, rq_link);
if (!req->rq_complete || req->rq_rc)
continue;
rc = brw_done(set);
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
if (info[i].count == 0)
continue;
-
+
loi = oinfo->oi_md->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
GOTO(out, rc = -EIO);
sizeof(*req->rq_oi.oi_oa));
}
req->rq_oi.oi_oa->o_id = loi->loi_id;
+ req->rq_oi.oi_oa->o_seq = loi->loi_seq;
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_buflen = sizeof(*req->rq_oi.oi_md);
/* XXX LOV STACKING */
req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
- req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
+ req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
req->rq_oabufs = info[i].count;
req->rq_pgaidx = shift;
shift += req->rq_oabufs;
if (set->set_completes)
rc = common_attr_done(set);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
/* The callback for osc_getattr_async that finilizes a request info when a
- * response is recieved. */
-static int cb_getattr_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_getattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
+ /* SOM requires all the OSTs to be active. */
+ GOTO(out_set, rc = -EIO);
continue;
}
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
+ req->rq_oi.oi_oa->o_seq = loi->loi_seq;
req->rq_oi.oi_cb_up = cb_getattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
- req->rq_rqset = set;
lov_set_add_req(req, set);
}
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(0);
}
struct lov_request *req;
loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
}
memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
+ req->rq_oi.oi_oa->o_seq = loi->loi_seq;
lov_set_add_req(req, set);
}
if (!set->set_count)
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
}
/* The callback for osc_setattr_async that finilizes a request info when a
- * response is recieved. */
-static int cb_setattr_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_setattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
- || req->rq_oi.oi_oa->o_gr>0);
+ req->rq_oi.oi_oa->o_seq= loi->loi_seq;
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
- req->rq_rqset = set;
if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
int off = lov_stripe_offset(oinfo->oi_md,
rc = common_attr_done(set);
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
}
/* The callback for osc_punch that finilizes a request info when a response
- * is recieved. */
-static int cb_update_punch(struct obd_info *oinfo, int rc)
+ * is received. */
+static int cb_update_punch(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
struct lov_request *req;
obd_off rs, re;
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
- !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
if (!lov_stripe_intersects(oinfo->oi_md, i,
oinfo->oi_policy.l_extent.start,
oinfo->oi_policy.l_extent.end,
&rs, &re))
continue;
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ GOTO(out_set, rc = -EIO);
+ }
+
OBD_ALLOC(req, sizeof(*req));
if (req == NULL)
GOTO(out_set, rc = -ENOMEM);
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- req->rq_oi.oi_oa->o_gr = loi->loi_gr;
+ req->rq_oi.oi_oa->o_seq = loi->loi_seq;
req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_update_punch;
- req->rq_rqset = set;
req->rq_oi.oi_policy.l_extent.start = rs;
req->rq_oi.oi_policy.l_extent.end = re;
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
}
memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
+ req->rq_oi.oi_oa->o_seq = loi->loi_seq;
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_policy.l_extent.start = rs;
if (osfs->os_ffree != LOV_U64_MAX)
do_div(osfs->os_ffree, expected_stripes);
- spin_lock(&obd->obd_osfs_lock);
+ cfs_spin_lock(&obd->obd_osfs_lock);
memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
- obd->obd_osfs_age = get_jiffies_64();
- spin_unlock(&obd->obd_osfs_lock);
+ obd->obd_osfs_age = cfs_time_current_64();
+ cfs_spin_unlock(&obd->obd_osfs_lock);
RETURN(0);
}
rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
set->set_success);
}
-
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
-
+ lov_put_reqset(set);
RETURN(rc);
}
-void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
- struct obd_statfs *lov_sfs, int success)
+void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
+ int success)
{
int shift = 0, quit = 0;
__u64 tmp;
- spin_lock(&obd->obd_osfs_lock);
- memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
- obd->obd_osfs_age = get_jiffies_64();
- spin_unlock(&obd->obd_osfs_lock);
if (success == 0) {
memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
}
/* The callback for osc_statfs_async that finilizes a request info when a
- * response is recieved. */
-static int cb_statfs_update(struct obd_info *oinfo, int rc)
+ * response is received. */
+static int cb_statfs_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
struct obd_statfs *osfs, *lov_sfs;
- struct obd_device *obd;
struct lov_obd *lov;
+ struct lov_tgt_desc *tgt;
+ struct obd_device *lovobd, *tgtobd;
int success;
ENTRY;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
- lov = &lovreq->rq_rqset->set_obd->u.lov;
- obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
-
+ lovobd = lovreq->rq_rqset->set_obd;
+ lov = &lovobd->u.lov;
osfs = lovreq->rq_rqset->set_oi->oi_osfs;
lov_sfs = oinfo->oi_osfs;
-
success = lovreq->rq_rqset->set_success;
-
/* XXX: the same is done in lov_update_common_set, however
lovset->set_exp is not initialized. */
lov_update_set(lovreq->rq_rqset, lovreq, rc);
- if (rc) {
- if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
- lov->lov_tgts[lovreq->rq_idx]->ltd_active))
- rc = 0;
- RETURN(rc);
- }
-
- lov_update_statfs(obd, osfs, lov_sfs, success);
+ if (rc)
+ GOTO(out, rc);
+
+ obd_getref(lovobd);
+ tgt = lov->lov_tgts[lovreq->rq_idx];
+ if (!tgt || !tgt->ltd_active)
+ GOTO(out_update, rc);
+
+ tgtobd = class_exp2obd(tgt->ltd_exp);
+ cfs_spin_lock(&tgtobd->obd_osfs_lock);
+ memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+ if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
+ tgtobd->obd_osfs_age = cfs_time_current_64();
+ cfs_spin_unlock(&tgtobd->obd_osfs_lock);
+
+out_update:
+ lov_update_statfs(osfs, lov_sfs, success);
qos_update(lov);
+ obd_putref(lovobd);
+
+out:
+ if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+ lov_finished_set(lovreq->rq_rqset)) {
+ lov_statfs_interpret(NULL, lovreq->rq_rqset,
+ lovreq->rq_rqset->set_success !=
+ lovreq->rq_rqset->set_count);
+ qos_statfs_done(lov);
+ }
RETURN(0);
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
struct lov_request *req;
- if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
+ if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
+ && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
CDEBUG(D_HA, "lov idx %d inactive\n", i);
continue;
}
+ /* skip targets that have been explicitely disabled by the
+ * administrator */
+ if (!lov->lov_tgts[i]->ltd_exp) {
+ CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
+ continue;
+ }
+
OBD_ALLOC(req, sizeof(*req));
if (req == NULL)
GOTO(out_set, rc = -ENOMEM);
req->rq_idx = i;
req->rq_oi.oi_cb_up = cb_statfs_update;
- req->rq_rqset = set;
+ req->rq_oi.oi_flags = oinfo->oi_flags;
lov_set_add_req(req, set);
}