/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
+ * GPL HEADER START
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
#ifndef EXPORT_SYMTAB
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
set->set_count++;
}
+extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags,
+ struct ost_lvb *lvb, __u32 mode, int rc);
+
+static int lov_update_enqueue_lov(struct obd_export *exp,
+ struct lustre_handle *lov_lockhp,
+ struct lov_oinfo *loi, int flags, int idx,
+ __u64 oid, int rc)
+{
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+
+ if (rc != ELDLM_OK &&
+ !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
+ memset(lov_lockhp, 0, sizeof(*lov_lockhp));
+ if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
+ /* -EUSERS used by OST to report file contention */
+ if (rc != -EINTR && rc != -EUSERS)
+ CERROR("enqueue objid "LPX64" subobj "
+ LPX64" on OST idx %d: rc %d\n",
+ oid, loi->loi_id, loi->loi_ost_idx, rc);
+ } else
+ rc = ELDLM_OK;
+ }
+ return rc;
+}
+
int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
{
struct lov_request_set *set = req->rq_rqset;
struct lustre_handle *lov_lockhp;
+ struct obd_info *oi = set->set_oi;
struct lov_oinfo *loi;
ENTRY;
- LASSERT(set != NULL);
- LASSERT(set->set_oi != NULL);
+ LASSERT(oi != NULL);
lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
- loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
+ loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
/* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
* and that copy can be arbitrarily out of date.
* The LOV API is due for a serious rewriting anyways, and this
* can be addressed then. */
- if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
- __u64 tmp;
-
- LASSERT(lock != NULL);
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- tmp = loi->loi_lvb.lvb_size;
- /* Extend KMS up to the end of this lock and no further
- * A lock on [x,y] means a KMS of up to y + 1 bytes! */
- if (tmp > lock->l_policy_data.l_extent.end)
- tmp = lock->l_policy_data.l_extent.end + 1;
- if (tmp >= loi->loi_kms) {
- LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
- ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
- loi->loi_kms = tmp;
- loi->loi_kms_valid = 1;
- } else {
- LDLM_DEBUG(lock, "lock acquired, setting rss="
- LPU64"; leaving kms="LPU64", end="LPU64,
- loi->loi_lvb.lvb_size, loi->loi_kms,
- lock->l_policy_data.l_extent.end);
- }
- lov_stripe_unlock(set->set_oi->oi_md);
- ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
- } else if ((rc == ELDLM_LOCK_ABORTED) &&
- (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- lov_stripe_lock(set->set_oi->oi_md);
- loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
- lov_stripe_unlock(set->set_oi->oi_md);
- CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
- " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
- rc = ELDLM_OK;
- } else {
- struct obd_export *exp = set->set_exp;
- struct lov_obd *lov = &exp->exp_obd->u.lov;
-
- memset(lov_lockhp, 0, sizeof(*lov_lockhp));
- if (lov->lov_tgts[req->rq_idx] &&
- lov->lov_tgts[req->rq_idx]->ltd_active) {
- if (rc != -EINTR)
- CERROR("enqueue objid "LPX64" subobj "
- LPX64" on OST idx %d: rc %d\n",
- set->set_oi->oi_md->lsm_object_id,
- loi->loi_id, loi->loi_ost_idx, rc);
- } else {
- rc = ELDLM_OK;
- }
- }
+ lov_stripe_lock(oi->oi_md);
+ osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
+ &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
+ if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
+ memset(lov_lockhp, 0, sizeof *lov_lockhp);
+ rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
+ req->rq_idx, oi->oi_md->lsm_object_id, rc);
+ lov_stripe_unlock(oi->oi_md);
lov_update_set(set, req, rc);
RETURN(rc);
}
/* The callback for osc_enqueue that updates lov info for every OSC request. */
-static int cb_update_enqueue(struct obd_info *oinfo, int rc)
+static int cb_update_enqueue(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct ldlm_enqueue_info *einfo;
struct lov_request *lovreq;
int ret = rc;
ENTRY;
- if (rc == 1)
+ if (rc > 0)
ret = 0;
else if (rc == 0)
ret = 1;
lov_lockhp = set->set_lockh->llh_handles + i;
if (!lustre_handle_is_used(lov_lockhp)) {
- CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
+ CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
loi->loi_ost_idx, loi->loi_id);
continue;
}
continue;
sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
- err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
+ err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
+ NULL);
if (err)
CERROR("Failed to uncreate objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
if (rc)
RETURN(rc);
- if (oti && oti->oti_objid)
- oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
-
loi->loi_id = req->rq_oi.oi_oa->o_id;
loi->loi_gr = req->rq_oi.oi_oa->o_gr;
loi->loi_ost_idx = req->rq_idx;
if (info[i].count == 0)
continue;
-
+
loi = oinfo->oi_md->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
GOTO(out, rc = -EIO);
/* The callback for osc_getattr_async that finilizes a request info when a
* response is recieved. */
-static int cb_getattr_update(struct obd_info *oinfo, int rc)
+static int cb_getattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
struct lov_request *req;
loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
/* The callback for osc_setattr_async that finilizes a request info when a
* response is recieved. */
-static int cb_setattr_update(struct obd_info *oinfo, int rc)
+static int cb_setattr_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
- || req->rq_oi.oi_oa->o_gr>0);
+ LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
+ CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
+ "req->rq_oi.oi_oa->o_valid="LPX64" "
+ "req->rq_oi.oi_oa->o_gr="LPU64"\n",
+ req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
/* The callback for osc_punch that finilizes a request info when a response
* is recieved. */
-static int cb_update_punch(struct obd_info *oinfo, int rc)
+static int cb_update_punch(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
req->rq_oi.oi_policy.l_extent.start = rs;
req->rq_oi.oi_policy.l_extent.end = re;
req->rq_oi.oi_policy.l_extent.gid = -1;
+ req->rq_rqset = set;
lov_set_add_req(req, set);
}
spin_lock(&obd->obd_osfs_lock);
memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
- obd->obd_osfs_age = get_jiffies_64();
+ obd->obd_osfs_age = cfs_time_current_64();
spin_unlock(&obd->obd_osfs_lock);
RETURN(0);
}
RETURN(rc);
}
-void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
- struct obd_statfs *lov_sfs, int success)
+void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
+ int success)
{
int shift = 0, quit = 0;
__u64 tmp;
- spin_lock(&obd->obd_osfs_lock);
- memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
- obd->obd_osfs_age = get_jiffies_64();
- spin_unlock(&obd->obd_osfs_lock);
if (success == 0) {
memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
/* The callback for osc_statfs_async that finilizes a request info when a
* response is recieved. */
-static int cb_statfs_update(struct obd_info *oinfo, int rc)
+static int cb_statfs_update(void *cookie, int rc)
{
+ struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
struct obd_statfs *osfs, *lov_sfs;
struct obd_device *obd;
RETURN(rc);
}
- lov_update_statfs(obd, osfs, lov_sfs, success);
+ spin_lock(&obd->obd_osfs_lock);
+ memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+ if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
+ obd->obd_osfs_age = cfs_time_current_64();
+ spin_unlock(&obd->obd_osfs_lock);
+
+ lov_update_statfs(osfs, lov_sfs, success);
qos_update(lov);
RETURN(0);
req->rq_idx = i;
req->rq_oi.oi_cb_up = cb_statfs_update;
+ req->rq_oi.oi_flags = oinfo->oi_flags;
req->rq_rqset = set;
lov_set_add_req(req, set);