X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=8ce49e27bb16e7d6ae48b70a82031bd987424023;hp=529b06474a0553b3d3d5b39168184d609a6ce416;hb=b0f15edd90807569acdc50bb973a0e80c87ea78e;hpb=20452784f0b21f4d8beb032651fcaef28aa0b15a diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 529b064..8ce49e2 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1,29 +1,37 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2001-2003 Cluster File Systems, Inc. - * Author Peter Braam + * GPL HEADER START * - * This file is part of Lustre, http://www.lustre.org. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * For testing and management it is treated as an obd_device, - * although * it does not export a full OBD method table (the - * requests are coming * in over the wire, so object target modules - * do not have a full * method table.) + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #ifndef EXPORT_SYMTAB @@ -31,44 +39,39 @@ #endif #define DEBUG_SUBSYSTEM S_OSC -#ifdef __KERNEL__ -# include -# include -# include -# include -# include -# include -# if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) -# include -# include -# else -# include -# endif -#else /* __KERNEL__ */ +#include + +#ifndef __KERNEL__ # include #endif -#include -#include -#include -#include +#include +#include #include -#include -#include +#include +#include +#include #ifdef __CYGWIN__ # include #endif -#include -#include -#include -#include -#include -#include - +#include +#include +#include +#include +#include +#include #include "osc_internal.h" +static quota_interface_t *quota_interface = NULL; +extern quota_interface_t osc_quota_interface; + +static void osc_release_ppga(struct brw_page **ppga, obd_count count); +static int brw_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, int rc); +int osc_cleanup(struct obd_device *obd); + /* Pack OSC object metadata for disk storage (LE byte order). */ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, struct lov_stripe_md *lsm) @@ -128,6 +131,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(lsm_size); if (*lsmp != NULL && lmm == NULL) { + OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); OBD_FREE(*lsmp, lsm_size); *lsmp = NULL; RETURN(0); @@ -137,7 +141,12 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, OBD_ALLOC(*lsmp, lsm_size); if (*lsmp == NULL) RETURN(-ENOMEM); - loi_init((*lsmp)->lsm_oinfo); + OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + if ((*lsmp)->lsm_oinfo[0] == NULL) { + OBD_FREE(*lsmp, lsm_size); + RETURN(-ENOMEM); + } + loi_init((*lsmp)->lsm_oinfo[0]); } if (lmm != NULL) { @@ -153,155 +162,265 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(lsm_size); } -static int osc_getattr_interpret(struct ptlrpc_request *req, - struct osc_getattr_async_args *aa, int rc) +static inline void osc_pack_capa(struct ptlrpc_request *req, + struct ost_body *body, void *capa) +{ + struct obd_capa *oc = (struct obd_capa *)capa; + struct lustre_capa *c; + + if (!capa) + return; + + c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); + LASSERT(c); + capa_cpy(c, oc); + body->oa.o_valid |= OBD_MD_FLOSSCAPA; + DEBUG_CAPA(D_SEC, c, "pack"); +} + +static inline void osc_pack_req_body(struct ptlrpc_request *req, + struct obd_info *oinfo) +{ + struct ost_body *body; + + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + + body->oa = *oinfo->oi_oa; + osc_pack_capa(req, body, oinfo->oi_capa); +} + +static inline void osc_set_capa_size(struct ptlrpc_request *req, + const struct req_msg_field *field, + struct obd_capa *oc) +{ + if (oc == NULL) + req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); + else + /* it is already calculated as sizeof struct obd_capa */ + ; +} + +static int osc_getattr_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_async_args *aa, int rc) { struct ost_body *body; ENTRY; if (rc != 0) - RETURN(rc); + GOTO(out, rc); - body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_ost_body); if (body) { CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa)); + memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa)); /* This should really be sent by the OST */ - aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ; + aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; + aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; } else { - CERROR("can't unpack ost_body\n"); + CDEBUG(D_INFO, "can't unpack ost_body\n"); rc = -EPROTO; - aa->aa_oa->o_valid = 0; + aa->aa_oi->oi_oa->o_valid = 0; } - +out: + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); RETURN(rc); } -static int osc_getattr_async(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, +static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, struct ptlrpc_request_set *set) { - struct ptlrpc_request *request; - struct ost_body *body; - int size = sizeof(*body); - struct osc_getattr_async_args *aa; + struct ptlrpc_request *req; + struct osc_async_args *aa; + int rc; ENTRY; - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_GETATTR, 1, &size, NULL); - if (!request) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); - memcpy(&body->oa, oa, sizeof(*oa)); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); - request->rq_replen = lustre_msg_size(1, &size); - request->rq_interpret_reply = osc_getattr_interpret; + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; - LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args)); - aa = (struct osc_getattr_async_args *)&request->rq_async_args; - aa->aa_oa = oa; + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oi = oinfo; - ptlrpc_set_add_req (set, request); - RETURN (0); + ptlrpc_set_add_req(set, req); + RETURN(0); } -static int osc_getattr(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md) +static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo) { - struct ptlrpc_request *request; - struct ost_body *body; - int rc, size = sizeof(*body); + struct ptlrpc_request *req; + struct ost_body *body; + int rc; ENTRY; - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_GETATTR, 1, &size, NULL); - if (!request) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); - memcpy(&body->oa, oa, sizeof(*oa)); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); - request->rq_replen = lustre_msg_size(1, &size); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(request); - if (rc) { - CERROR("%s failed: rc = %d\n", __FUNCTION__, rc); + rc = ptlrpc_queue_wait(req); + if (rc) GOTO(out, rc); - } - body = lustre_swab_repbuf(request, 0, sizeof (*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - memcpy(oa, &body->oa, sizeof(*oa)); + *oinfo->oi_oa = body->oa; /* This should really be sent by the OST */ - oa->o_blksize = PTLRPC_MAX_BRW_SIZE; - oa->o_valid |= OBD_MD_FLBLKSZ; + oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; + oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; EXIT; out: - ptlrpc_req_finished(request); + ptlrpc_req_finished(req); return rc; } -static int osc_setattr(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, struct obd_trans_info *oti, - struct lustre_capa *capa) +static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo, + struct obd_trans_info *oti) +{ + struct ptlrpc_request *req; + struct ost_body *body; + int rc; + ENTRY; + + LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || + oinfo->oi_oa->o_gr > 0); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); + + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); + + ptlrpc_request_set_replen(req); + + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); + + *oinfo->oi_oa = body->oa; + + EXIT; +out: + ptlrpc_req_finished(req); + RETURN(rc); +} + +static int osc_setattr_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_async_args *aa, int rc) { - struct ptlrpc_request *request; struct ost_body *body; - int rc, size = sizeof(*body); ENTRY; - LASSERT(!(oa->o_valid & OBD_MD_FLGROUP) || oa->o_gr > 0); + if (rc != 0) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); + + *aa->aa_oi->oi_oa = body->oa; +out: + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); + RETURN(rc); +} + +static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, + struct obd_trans_info *oti, + struct ptlrpc_request_set *rqset) +{ + struct ptlrpc_request *req; + struct osc_async_args *aa; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); + + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SETATTR, 1, &size, NULL); - if (!request) - RETURN(-ENOMEM); + osc_pack_req_body(req, oinfo); - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body)); - memcpy(&body->oa, oa, sizeof(*oa)); + ptlrpc_request_set_replen(req); - request->rq_replen = lustre_msg_size(1, &size); + if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) { + LASSERT(oti); + oinfo->oi_oa->o_lcookie = *oti->oti_logcookies; + } - if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) { - ptlrpcd_add_req(request); - rc = 0; + /* do mds to ost setattr asynchronously */ + if (!rqset) { + /* Do not wait for response. */ + ptlrpcd_add_req(req); } else { - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out, rc); + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_setattr_interpret; - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) - GOTO(out, rc = -EPROTO); + CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oi = oinfo; - memcpy(oa, &body->oa, sizeof(*oa)); + ptlrpc_set_add_req(rqset, req); } - EXIT; -out: - ptlrpc_req_finished(request); + RETURN(0); } int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { - struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; - struct ptlrpc_request *request; - struct ost_body *body; - struct lov_stripe_md *lsm; - int rc, size = sizeof(*body); + struct ptlrpc_request *req; + struct ost_body *body; + struct lov_stripe_md *lsm; + int rc; ENTRY; LASSERT(oa); @@ -314,46 +433,39 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, RETURN(rc); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_CREATE, 1, &size, NULL); - if (!request) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); + if (req == NULL) GOTO(out, rc = -ENOMEM); - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); - memcpy(&body->oa, oa, sizeof(body->oa)); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE); + if (rc) { + ptlrpc_request_free(req); + GOTO(out, rc); + } + + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + body->oa = *oa; - request->rq_replen = lustre_msg_size(1, &size); - if (oa->o_valid & OBD_MD_FLINLINE) { - LASSERT((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_DELORPHAN); - DEBUG_REQ(D_HA, request, + ptlrpc_request_set_replen(req); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + oa->o_flags == OBD_FL_DELORPHAN) { + DEBUG_REQ(D_HA, req, "delorphan from OST integration"); - /* Don't resend the delorphan request */ - request->rq_no_resend = request->rq_no_delay = 1; + /* Don't resend the delorphan req */ + req->rq_no_resend = req->rq_no_delay = 1; } - rc = ptlrpc_queue_wait(request); + rc = ptlrpc_queue_wait(req); if (rc) GOTO(out_req, rc); - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out_req, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out_req, rc = -EPROTO); - if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) { - struct obd_import *imp = class_exp2cliimp(exp); - /* MDS declares last known object, OSS responses - * with next possible object -bzzz */ - spin_lock(&oscc->oscc_lock); - oscc->oscc_next_id = body->oa.o_id; - spin_unlock(&oscc->oscc_lock); - CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n", - imp->imp_target_uuid.uuid, oa->o_id); - } - memcpy(oa, &body->oa, sizeof(*oa)); + *oa = body->oa; /* This should really be sent by the OST */ oa->o_blksize = PTLRPC_MAX_BRW_SIZE; @@ -368,221 +480,356 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, *ea = lsm; if (oti != NULL) { - oti->oti_transno = request->rq_repmsg->transno; + oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); if (oa->o_valid & OBD_MD_FLCOOKIE) { if (!oti->oti_logcookies) oti_alloc_cookies(oti, 1); - memcpy(oti->oti_logcookies, obdo_logcookie(oa), - sizeof(oti->oti_onecookie)); + *oti->oti_logcookies = oa->o_lcookie; } } - CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno); - EXIT; + CDEBUG(D_HA, "transno: "LPD64"\n", + lustre_msg_get_transno(req->rq_repmsg)); out_req: - ptlrpc_req_finished(request); + ptlrpc_req_finished(req); out: if (rc && !*ea) obd_free_memmd(exp, &lsm); - return rc; + RETURN(rc); } -static int osc_punch(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_size start, - obd_size end, struct obd_trans_info *oti, - struct lustre_capa *capa) +static int osc_punch_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_async_args *aa, int rc) { - struct ptlrpc_request *request; struct ost_body *body; - struct lustre_capa *req_capa; - int bufcnt = 0; - int rc, size[2] = { sizeof(*body), sizeof(*capa) }; ENTRY; - if (!oa) { - CERROR("oa NULL\n"); + if (rc != 0) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); + + *aa->aa_oi->oi_oa = body->oa; +out: + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); + RETURN(rc); +} + +static int osc_punch(struct obd_export *exp, struct obd_info *oinfo, + struct obd_trans_info *oti, + struct ptlrpc_request_set *rqset) +{ + struct ptlrpc_request *req; + struct osc_async_args *aa; + struct ost_body *body; + int rc; + ENTRY; + + if (!oinfo->oi_oa) { + CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_PUNCH, capa ? 2 : 1, size, NULL); - if (!request) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH); + if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(request->rq_reqmsg, bufcnt++, sizeof (*body)); - - memcpy(&body->oa, oa, sizeof(*oa)); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ + ptlrpc_at_set_req_timeout(req); + osc_pack_req_body(req, oinfo); /* overload the size and blocks fields in the oa with start/end */ - body->oa.o_size = start; - body->oa.o_blocks = end; + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + body->oa.o_size = oinfo->oi_policy.l_extent.start; + body->oa.o_blocks = oinfo->oi_policy.l_extent.end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); + ptlrpc_request_set_replen(req); - if (capa) { - req_capa = lustre_msg_buf(request->rq_reqmsg, bufcnt++, - sizeof(*capa)); - capa_dup2(req_capa, capa); - body->oa.o_valid |= OBD_MD_CAPA; - } - - request->rq_replen = lustre_msg_size(1, size); - - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out, rc); - - body = lustre_swab_repbuf (request, 0, sizeof (*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out, rc = -EPROTO); - } - memcpy(oa, &body->oa, sizeof(*oa)); + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret; + CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oi = oinfo; + ptlrpc_set_add_req(rqset, req); - EXIT; - out: - ptlrpc_req_finished(request); - return rc; + RETURN(0); } static int osc_sync(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_size start, - obd_size end) + struct lov_stripe_md *md, obd_size start, obd_size end, + void *capa) { - struct ptlrpc_request *request; - struct ost_body *body; - int rc, size = sizeof(*body); + struct ptlrpc_request *req; + struct ost_body *body; + int rc; ENTRY; if (!oa) { - CERROR("oa NULL\n"); + CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SYNC, 1, &size, NULL); - if (!request) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC); + if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); - memcpy(&body->oa, oa, sizeof(*oa)); + osc_set_capa_size(req, &RMF_CAPA1, capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } /* overload the size and blocks fields in the oa with start/end */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + body->oa = *oa; body->oa.o_size = start; body->oa.o_blocks = end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); + osc_pack_capa(req, body, capa); - request->rq_replen = lustre_msg_size(1, &size); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(request); + rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); - memcpy(oa, &body->oa, sizeof(*oa)); + *oa = body->oa; EXIT; out: - ptlrpc_req_finished(request); + ptlrpc_req_finished(req); return rc; } +/* Find and cancel locally locks matched by @mode in the resource found by + * @objid. Found locks are added into @cancel list. Returns the amount of + * locks added to @cancels list. */ +static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, + struct list_head *cancels, ldlm_mode_t mode, + int lock_flags) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_res_id res_id; + struct ldlm_resource *res; + int count; + ENTRY; + + osc_build_res_name(oa->o_id, oa->o_gr, &res_id); + res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); + if (res == NULL) + RETURN(0); + + LDLM_RESOURCE_ADDREF(res); + count = ldlm_cancel_resource_local(res, cancels, NULL, mode, + lock_flags, 0, NULL); + LDLM_RESOURCE_DELREF(res); + ldlm_resource_putref(res); + RETURN(count); +} + +static int osc_destroy_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, + int rc) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + + atomic_dec(&cli->cl_destroy_in_flight); + cfs_waitq_signal(&cli->cl_destroy_waitq); + return 0; +} + +static int osc_can_send_destroy(struct client_obd *cli) +{ + if (atomic_inc_return(&cli->cl_destroy_in_flight) <= + cli->cl_max_rpcs_in_flight) { + /* The destroy request can be sent */ + return 1; + } + if (atomic_dec_return(&cli->cl_destroy_in_flight) < + cli->cl_max_rpcs_in_flight) { + /* + * The counter has been modified between the two atomic + * operations. + */ + cfs_waitq_signal(&cli->cl_destroy_waitq); + } + return 0; +} + +/* Destroy requests can be async always on the client, and we don't even really + * care about the return code since the client cannot do anything at all about + * a destroy failure. + * When the MDS is unlinking a filename, it saves the file objects into a + * recovery llog, and these object records are cancelled when the OST reports + * they were destroyed and sync'd to disk (i.e. transaction committed). + * If the client dies, or the OST is down when the object should be destroyed, + * the records are not cancelled, and when the OST reconnects to the MDS next, + * it will retrieve the llog unlink logs and then sends the log cancellation + * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *ea, struct obd_trans_info *oti) + struct lov_stripe_md *ea, struct obd_trans_info *oti, + struct obd_export *md_export) { - struct ptlrpc_request *request; - struct ost_body *body; - int rc, size = sizeof(*body); + struct client_obd *cli = &exp->exp_obd->u.cli; + struct ptlrpc_request *req; + struct ost_body *body; + CFS_LIST_HEAD(cancels); + int rc, count; ENTRY; if (!oa) { - CERROR("oa NULL\n"); + CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_DESTROY, 1, &size, NULL); - if (!request) - RETURN(-ENOMEM); - request->rq_request_portal = OST_DESTROY_PORTAL; - - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); + count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW, + LDLM_FL_DISCARD_DATA); - if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) { - memcpy(obdo_logcookie(oa), oti->oti_logcookies, - sizeof(*oti->oti_logcookies)); - oti->oti_logcookies++; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); + RETURN(-ENOMEM); } - memcpy(&body->oa, oa, sizeof(*oa)); - request->rq_replen = lustre_msg_size(1, &size); + rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, + 0, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) { - ptlrpcd_add_req(request); - rc = 0; - } else { - rc = ptlrpc_queue_wait(request); + req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ + req->rq_interpret_reply = osc_destroy_interpret; + ptlrpc_at_set_req_timeout(req); - if (rc == -ENOENT) - rc = 0; + if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) + oa->o_lcookie = *oti->oti_logcookies; + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + body->oa = *oa; - if (rc) { - ptlrpc_req_finished(request); - RETURN(rc); - } + ptlrpc_request_set_replen(req); - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("Can't unpack body\n"); - ptlrpc_req_finished(request); - RETURN(-EPROTO); - } + if (!osc_can_send_destroy(cli)) { + struct l_wait_info lwi = { 0 }; - memcpy(oa, &body->oa, sizeof(*oa)); - ptlrpc_req_finished(request); + /* + * Wait until the number of on-going destroy RPCs drops + * under max_rpc_in_flight + */ + l_wait_event_exclusive(cli->cl_destroy_waitq, + osc_can_send_destroy(cli), &lwi); } - RETURN(rc); + + /* Do not wait for response */ + ptlrpcd_add_req(req); + RETURN(0); } static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, long writing_bytes) { - obd_valid bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; + obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; LASSERT(!(oa->o_valid & bits)); oa->o_valid |= bits; - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); oa->o_dirty = cli->cl_dirty; - oa->o_undirty = cli->cl_dirty_max - oa->o_dirty; + if (cli->cl_dirty > cli->cl_dirty_max) { + CERROR("dirty %lu > dirty_max %lu\n", + cli->cl_dirty, cli->cl_dirty_max); + oa->o_undirty = 0; + } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) { + CERROR("dirty %d > system dirty_max %d\n", + atomic_read(&obd_dirty_pages), obd_max_dirty_pages); + oa->o_undirty = 0; + } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) { + CERROR("dirty %lu - dirty_max %lu too big???\n", + cli->cl_dirty, cli->cl_dirty_max); + oa->o_undirty = 0; + } else { + long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)* + (cli->cl_max_rpcs_in_flight + 1); + oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); + } oa->o_grant = cli->cl_avail_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); } /* caller must hold loi_list_lock */ static void osc_consume_write_grant(struct client_obd *cli, - struct osc_async_page *oap) + struct brw_page *pga) { - cli->cl_dirty += PAGE_SIZE; - cli->cl_avail_grant -= PAGE_SIZE; - oap->oap_brw_flags |= OBD_BRW_FROM_GRANT; - CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap); + atomic_inc(&obd_dirty_pages); + cli->cl_dirty += CFS_PAGE_SIZE; + cli->cl_avail_grant -= CFS_PAGE_SIZE; + pga->flag |= OBD_BRW_FROM_GRANT; + CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", + CFS_PAGE_SIZE, pga, pga->pg); LASSERT(cli->cl_avail_grant >= 0); } +/* the companion to osc_consume_write_grant, called when a brw has completed. + * must be called with the loi lock held. */ +static void osc_release_write_grant(struct client_obd *cli, + struct brw_page *pga, int sent) +{ + int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096; + ENTRY; + + if (!(pga->flag & OBD_BRW_FROM_GRANT)) { + EXIT; + return; + } + + pga->flag &= ~OBD_BRW_FROM_GRANT; + atomic_dec(&obd_dirty_pages); + cli->cl_dirty -= CFS_PAGE_SIZE; + if (!sent) { + cli->cl_lost_grant += CFS_PAGE_SIZE; + CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n", + cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty); + } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) { + /* For short writes we shouldn't count parts of pages that + * span a whole block on the OST side, or our accounting goes + * wrong. Should match the code in filter_grant_check. */ + int offset = pga->off & ~CFS_PAGE_MASK; + int count = pga->count + (offset & (blocksize - 1)); + int end = (offset + pga->count) & (blocksize - 1); + if (end) + count += blocksize - end; + + cli->cl_lost_grant += CFS_PAGE_SIZE - count; + CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n", + CFS_PAGE_SIZE - count, cli->cl_lost_grant, + cli->cl_avail_grant, cli->cl_dirty); + } + + EXIT; +} + static unsigned long rpcs_in_flight(struct client_obd *cli) { return cli->cl_r_in_flight + cli->cl_w_in_flight; @@ -594,43 +841,61 @@ void osc_wake_cache_waiters(struct client_obd *cli) struct list_head *l, *tmp; struct osc_cache_waiter *ocw; + ENTRY; list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { /* if we can't dirty more, we must wait until some is written */ - if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) { - CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n", - cli->cl_dirty, cli->cl_dirty_max); + if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) || + (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) { + CDEBUG(D_CACHE, "no dirty room: dirty: %ld " + "osc max %ld, sys max %d\n", cli->cl_dirty, + cli->cl_dirty_max, obd_max_dirty_pages); return; } /* if still dirty cache but no grant wait for pending RPCs that * may yet return us some grant before doing sync writes */ - if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) { + if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) { CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n", cli->cl_w_in_flight); + return; } + ocw = list_entry(l, struct osc_cache_waiter, ocw_entry); list_del_init(&ocw->ocw_entry); - if (cli->cl_avail_grant < PAGE_SIZE) { + if (cli->cl_avail_grant < CFS_PAGE_SIZE) { /* no more RPCs in flight to return grant, do sync IO */ ocw->ocw_rc = -EDQUOT; CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap); } else { - osc_consume_write_grant(cli, ocw->ocw_oap); + osc_consume_write_grant(cli, + &ocw->ocw_oap->oap_brw_page); } - wake_up(&ocw->ocw_waitq); + cfs_waitq_signal(&ocw->ocw_waitq); } EXIT; } +static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) +{ + client_obd_list_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = ocd->ocd_grant; + client_obd_list_unlock(&cli->cl_loi_list_lock); + + CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n", + cli->cl_avail_grant, cli->cl_lost_grant); + LASSERT(cli->cl_avail_grant >= 0); +} + static void osc_update_grant(struct client_obd *cli, struct ost_body *body) { - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant); - cli->cl_avail_grant += body->oa.o_grant; - /* waiters are woken in brw_interpret_oap */ - spin_unlock(&cli->cl_loi_list_lock); + if (body->oa.o_valid & OBD_MD_FLGRANT) + cli->cl_avail_grant += body->oa.o_grant; + /* waiters are woken in brw_interpret */ + client_obd_list_unlock(&cli->cl_loi_list_lock); } /* We assume that the reason this OSC got a short read is because it read @@ -638,69 +903,71 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body) * via the LOV, and it _knows_ it's reading inside the file, it's just that * this stripe never got written at or beyond this stripe offset yet. */ static void handle_short_read(int nob_read, obd_count page_count, - struct brw_page *pga) + struct brw_page **pga) { char *ptr; + int i = 0; /* skip bytes read OK */ while (nob_read > 0) { LASSERT (page_count > 0); - if (pga->count > nob_read) { + if (pga[i]->count > nob_read) { /* EOF inside this page */ - ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK); - memset(ptr + nob_read, 0, pga->count - nob_read); - kunmap(pga->pg); + ptr = cfs_kmap(pga[i]->pg) + + (pga[i]->off & ~CFS_PAGE_MASK); + memset(ptr + nob_read, 0, pga[i]->count - nob_read); + cfs_kunmap(pga[i]->pg); page_count--; - pga++; + i++; break; } - nob_read -= pga->count; + nob_read -= pga[i]->count; page_count--; - pga++; + i++; } /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK); - memset(ptr, 0, pga->count); - kunmap(pga->pg); - pga++; + ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); + memset(ptr, 0, pga[i]->count); + cfs_kunmap(pga[i]->pg); + i++; } } -static int check_write_rcs(struct ptlrpc_request *request, +static int check_write_rcs(struct ptlrpc_request *req, int requested_nob, int niocount, - obd_count page_count, struct brw_page *pga) + obd_count page_count, struct brw_page **pga) { - int *remote_rcs, i; + int *remote_rcs, i; /* return error if any niobuf was in error */ - remote_rcs = lustre_swab_repbuf(request, 1, + remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1, sizeof(*remote_rcs) * niocount, NULL); if (remote_rcs == NULL) { - CERROR("Missing/short RC vector on BRW_WRITE reply\n"); + CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n"); return(-EPROTO); } - if (lustre_msg_swabbed(request->rq_repmsg)) + if (lustre_msg_swabbed(req->rq_repmsg)) for (i = 0; i < niocount; i++) - __swab32s((__u32 *)&remote_rcs[i]); + __swab32s(&remote_rcs[i]); for (i = 0; i < niocount; i++) { if (remote_rcs[i] < 0) return(remote_rcs[i]); if (remote_rcs[i] != 0) { - CERROR("rc[%d] invalid (%d) req %p\n", - i, remote_rcs[i], request); + CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", + i, remote_rcs[i], req); return(-EPROTO); } } - if (request->rq_bulk->bd_nob_transferred != requested_nob) { + if (req->rq_bulk->bd_nob_transferred != requested_nob) { CERROR("Unexpected # bytes transferred: %d (requested %d)\n", - requested_nob, request->rq_bulk->bd_nob_transferred); + req->rq_bulk->bd_nob_transferred, requested_nob); return(-EPROTO); } @@ -720,238 +987,343 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return 0; } - return (p1->disk_offset + p1->count == p2->disk_offset); + return (p1->off + p1->count == p2->off); } -#if CHECKSUM_BULK -static obd_count cksum_pages(int nob, obd_count page_count, - struct brw_page *pga) +static obd_count osc_checksum_bulk(int nob, obd_count pg_count, + struct brw_page **pga, int opc, + cksum_type_t cksum_type) { - obd_count cksum = 0; - char *ptr; - - while (nob > 0) { - LASSERT (page_count > 0); - - ptr = kmap(pga->pg); - ost_checksum(&cksum, ptr + (pga->off & (PAGE_SIZE - 1)), - pga->count > nob ? nob : pga->count); - kunmap(pga->pg); - - nob -= pga->count; - page_count--; - pga++; + __u32 cksum; + int i = 0; + + LASSERT (pg_count > 0); + cksum = init_checksum(cksum_type); + while (nob > 0 && pg_count > 0) { + unsigned char *ptr = cfs_kmap(pga[i]->pg); + int off = pga[i]->off & ~CFS_PAGE_MASK; + int count = pga[i]->count > nob ? nob : pga[i]->count; + + /* corrupt the data before we compute the checksum, to + * simulate an OST->client data error */ + if (i == 0 && opc == OST_READ && + OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) + memcpy(ptr + off, "bad1", min(4, nob)); + cksum = compute_checksum(cksum, ptr + off, count, cksum_type); + cfs_kunmap(pga[i]->pg); + LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n", + off, cksum); + + nob -= pga[i]->count; + pg_count--; + i++; } + /* For sending we only compute the wrong checksum instead + * of corrupting the data so it is still correct on a redo */ + if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) + cksum++; - return (cksum); + return cksum; } -#endif -#define osc_encrypt_page(page, off, count) \ - osc_crypt_page(page, off, count, ENCRYPT_DATA) -#define osc_decrypt_page(page, off, count) \ - osc_crypt_page(page, off, count, DECRYPT_DATA) - -/*Put a global call back var here is Ugly, but put it to client_obd - *also seems not a good idea, WangDi*/ -crypt_cb_t osc_crypt_cb = NULL; - -static int osc_crypt_page(struct page *page, obd_off page_off, obd_off count, - int flags) -{ - int rc = 0; - ENTRY; - - if (osc_crypt_cb != NULL) - rc = osc_crypt_cb(page, page_off, count, flags); - if (rc != 0) - CERROR("crypt page error %d \n", rc); - RETURN(rc); -} - -static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, +static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page *pga, int *requested_nobp, - int *niocountp, struct ptlrpc_request **reqp) + struct brw_page **pga, + struct ptlrpc_request **reqp, + struct obd_capa *ocapa) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; - struct client_obd *cli = &imp->imp_obd->u.cli; struct ost_body *body; - struct lustre_id *raw_id = obdo_id(oa); - struct obd_capa *ocapa = NULL; - struct lustre_capa *capa = NULL; struct obd_ioobj *ioobj; struct niobuf_remote *niobuf; - int niocount; - int size[4]; - int i, bufcnt = 0; - int requested_nob; - int opc; - int capa_op; - int rc; - - opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ; - - for (niocount = i = 1; i < page_count; i++) - if (!can_merge_pages(&pga[i - 1], &pga[i])) - niocount++; + int niocount, i, requested_nob, opc, rc; + struct osc_brw_async_args *aa; + struct req_capsule *pill; + struct brw_page *pg_prev; - capa_op = (opc == OST_WRITE) ? MAY_WRITE : MAY_READ; -get_capa: - ocapa = capa_get(oa->o_fsuid, capa_op, raw_id->li_fid.lf_group, - raw_id->li_stc.u.e3s.l3s_ino, CLIENT_CAPA, - NULL, NULL, NULL); - if (!ocapa) { - if (opc == OST_READ && capa_op == MAY_READ) { - /* partial write might cause read, MAY_WRITE capability - * should be used here */ - capa_op = MAY_WRITE; - goto get_capa; - } + ENTRY; + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) + RETURN(-ENOMEM); /* Recoverable */ + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) + RETURN(-EINVAL); /* Fatal */ + + if ((cmd & OBD_BRW_WRITE) != 0) { + opc = OST_WRITE; + req = ptlrpc_request_alloc_pool(cli->cl_import, + cli->cl_import->imp_rq_pool, + &RQF_OST_BRW); + } else { + opc = OST_READ; + req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW); } - size[bufcnt++] = sizeof(*body); - size[bufcnt++] = sizeof(*ioobj); - if (ocapa) - size[bufcnt++] = sizeof(*capa); - size[bufcnt++] = niocount * sizeof(*niobuf); - - req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, bufcnt, size, NULL); if (req == NULL) - return (-ENOMEM); + RETURN(-ENOMEM); + + for (niocount = i = 1; i < page_count; i++) { + if (!can_merge_pages(pga[i - 1], pga[i])) + niocount++; + } + + pill = &req->rq_pill; + req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, + niocount * sizeof(*niobuf)); + osc_set_capa_size(req, &RMF_CAPA1, ocapa); + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ + ptlrpc_at_set_req_timeout(req); if (opc == OST_WRITE) - desc = ptlrpc_prep_bulk_imp (req, page_count, - BULK_GET_SOURCE, OST_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, page_count, + BULK_GET_SOURCE, OST_BULK_PORTAL); else - desc = ptlrpc_prep_bulk_imp (req, page_count, - BULK_PUT_SINK, OST_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, page_count, + BULK_PUT_SINK, OST_BULK_PORTAL); + if (desc == NULL) GOTO(out, rc = -ENOMEM); /* NB request now owns desc and will free it when it gets freed */ - bufcnt = 0; - body = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*body)); - ioobj = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*ioobj)); - if (ocapa) - capa = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*capa)); - niobuf = lustre_msg_buf(req->rq_reqmsg, bufcnt++, - niocount * sizeof(*niobuf)); + body = req_capsule_client_get(pill, &RMF_OST_BODY); + ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); + niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); + LASSERT(body && ioobj && niobuf); - memcpy(&body->oa, oa, sizeof(*oa)); + body->oa = *oa; obdo_to_ioobj(oa, ioobj); ioobj->ioo_bufcnt = niocount; - - if (ocapa) { - capa_dup(capa, ocapa); - body->oa.o_valid |= OBD_MD_CAPA; - capa_put(ocapa, CLIENT_CAPA); - } - + osc_pack_capa(req, body, ocapa); LASSERT (page_count > 0); - + pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { - struct brw_page *pg = &pga[i]; - struct brw_page *pg_prev = pg - 1; + struct brw_page *pg = pga[i]; LASSERT(pg->count > 0); - LASSERTF((pg->page_offset & ~PAGE_MASK)+ pg->count <= PAGE_SIZE, - "i: %d pg: %p pg_off: "LPU64", count: %u\n", i, pg, - pg->page_offset, pg->count); - LASSERTF(i == 0 || pg->disk_offset > pg_prev->disk_offset, + LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE, + "i: %d pg: %p off: "LPU64", count: %u\n", i, pg, + pg->off, pg->count); +#ifdef __linux__ + LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n", i, page_count, - pg->pg, pg->pg->private, pg->pg->index, pg->disk_offset, - pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index, - pg_prev->disk_offset); - - if (opc == OST_WRITE) { - rc = osc_encrypt_page(pg->pg, pg->page_offset, pg->count); - if (rc) - GOTO(out, rc); - } + pg->pg, page_private(pg->pg), pg->pg->index, pg->off, + pg_prev->pg, page_private(pg_prev->pg), + pg_prev->pg->index, pg_prev->off); +#else + LASSERTF(i == 0 || pg->off > pg_prev->off, + "i %d p_c %u\n", i, page_count); +#endif + LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == + (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page(desc, pg->pg, - pg->page_offset & ~PAGE_MASK, pg->count); + ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK, + pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; niobuf->len += pg->count; } else { - niobuf->offset = pg->disk_offset; + niobuf->offset = pg->off; niobuf->len = pg->count; niobuf->flags = pg->flag; } + pg_prev = pg; } - LASSERT((void *)(niobuf - niocount) == - lustre_msg_buf(req->rq_reqmsg, bufcnt - 1, - niocount * sizeof(*niobuf))); + LASSERTF((void *)(niobuf - niocount) == + lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*niobuf)), + "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, + REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), + (void *)(niobuf - niocount)); + osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); - /* size[0] still sizeof (*body) */ + /* size[REQ_REC_OFF] still sizeof (*body) */ if (opc == OST_WRITE) { -#if CHECKSUM_BULK - body->oa.o_valid |= OBD_MD_FLCKSUM; - body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga); -#endif + if (unlikely(cli->cl_checksum) && + req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) { + /* store cl_cksum_type in a local variable since + * it can be changed via lprocfs */ + cksum_type_t cksum_type = cli->cl_cksum_type; + + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) + oa->o_flags = body->oa.o_flags = 0; + body->oa.o_flags |= cksum_type_pack(cksum_type); + body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + body->oa.o_cksum = osc_checksum_bulk(requested_nob, + page_count, pga, + OST_WRITE, + cksum_type); + CDEBUG(D_PAGE, "checksum at write origin: %x\n", + body->oa.o_cksum); + /* save this in 'oa', too, for later checking */ + oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + oa->o_flags |= cksum_type_pack(cksum_type); + } else { + /* clear out the checksum flag, in case this is a + * resend but cl_checksum is no longer set. b=11238 */ + oa->o_valid &= ~OBD_MD_FLCKSUM; + } + oa->o_cksum = body->oa.o_cksum; /* 1 RC per niobuf */ - size[1] = sizeof(__u32) * niocount; - req->rq_replen = lustre_msg_size(2, size); + req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, + sizeof(__u32) * niocount); } else { + if (unlikely(cli->cl_checksum) && + req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) + body->oa.o_flags = 0; + body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type); + body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + } + req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0); /* 1 RC for the whole I/O */ - req->rq_replen = lustre_msg_size(1, size); } + ptlrpc_request_set_replen(req); + + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oa = oa; + aa->aa_requested_nob = requested_nob; + aa->aa_nio_count = niocount; + aa->aa_page_count = page_count; + aa->aa_resends = 0; + aa->aa_ppga = pga; + aa->aa_cli = cli; + CFS_INIT_LIST_HEAD(&aa->aa_oaps); - *niocountp = niocount; - *requested_nobp = requested_nob; *reqp = req; - return (0); + RETURN(0); out: - ptlrpc_req_finished (req); - return (rc); + ptlrpc_req_finished(req); + RETURN(rc); } -static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, - int requested_nob, int niocount, - obd_count page_count, struct brw_page *pga, - int rc) +static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, + __u32 client_cksum, __u32 server_cksum, int nob, + obd_count page_count, struct brw_page **pga, + cksum_type_t client_cksum_type) { - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 new_cksum; + char *msg; + cksum_type_t cksum_type; + + if (server_cksum == client_cksum) { + CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); + return 0; + } + + if (oa->o_valid & OBD_MD_FLFLAGS) + cksum_type = cksum_type_unpack(oa->o_flags); + else + cksum_type = OBD_CKSUM_CRC32; + + new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, + cksum_type); + + if (cksum_type != client_cksum_type) + msg = "the server did not use the checksum type specified in " + "the original request - likely a protocol problem"; + else if (new_cksum == server_cksum) + msg = "changed on the client after we checksummed it - " + "likely false positive due to mmap IO (bug 11742)"; + else if (new_cksum == client_cksum) + msg = "changed in transit before arrival at OST"; + else + msg = "changed in transit AND doesn't match the original - " + "likely false positive due to mmap IO (bug 11742)"; + + LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum " + LPU64"/"LPU64" object "LPU64"/"LPU64" extent " + "["LPU64"-"LPU64"]\n", + msg, libcfs_nid2str(peer->nid), + oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0, + oa->o_valid & OBD_MD_FLFID ? oa->o_generation : + (__u64)0, + oa->o_id, + oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0, + pga[0]->off, + pga[page_count-1]->off + pga[page_count-1]->count - 1); + CERROR("original client csum %x (type %x), server csum %x (type %x), " + "client csum now %x\n", client_cksum, client_cksum_type, + server_cksum, cksum_type, new_cksum); + return 1; +} + +/* Note rc enters this function as number of bytes transferred */ +static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) +{ + struct osc_brw_async_args *aa = (void *)&req->rq_async_args; + const lnet_process_id_t *peer = + &req->rq_import->imp_connection->c_peer; + struct client_obd *cli = aa->aa_cli; struct ost_body *body; + __u32 client_cksum = 0; ENTRY; - if (rc < 0) + if (rc < 0 && rc != -EDQUOT) RETURN(rc); - body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body); + LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_ost_body); if (body == NULL) { - CERROR ("Can't unpack body\n"); + CDEBUG(D_INFO, "Can't unpack body\n"); RETURN(-EPROTO); } + /* set/clear over quota flag for a uid/gid */ + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && + body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) + lquota_setdq(quota_interface, cli, body->oa.o_uid, + body->oa.o_gid, body->oa.o_valid, + body->oa.o_flags); + + if (rc < 0) + RETURN(rc); + + if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) + client_cksum = aa->aa_oa->o_cksum; /* save for later */ + osc_update_grant(cli, body); - memcpy(oa, &body->oa, sizeof(*oa)); - if (req->rq_reqmsg->opc == OST_WRITE) { + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { if (rc > 0) { - CERROR ("Unexpected +ve rc %d\n", rc); + CERROR("Unexpected +ve rc %d\n", rc); RETURN(-EPROTO); } - LASSERT (req->rq_bulk->bd_nob == requested_nob); - osc_decrypt_page(pga->pg, pga->page_offset, - pga->count); - RETURN(check_write_rcs(req, requested_nob, niocount, - page_count, pga)); + LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob); + + if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && + check_write_checksum(&body->oa, peer, client_cksum, + body->oa.o_cksum, aa->aa_requested_nob, + aa->aa_page_count, aa->aa_ppga, + cksum_type_unpack(aa->aa_oa->o_flags))) + RETURN(-EAGAIN); + + if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) + RETURN(-EAGAIN); + + rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count, + aa->aa_page_count, aa->aa_ppga); + GOTO(out, rc); } - if (rc > requested_nob) { - CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob); + /* The rest of this function executes only for OST_READs */ + if (rc > aa->aa_requested_nob) { + CERROR("Unexpected rc %d (%d requested)\n", rc, + aa->aa_requested_nob); RETURN(-EPROTO); } @@ -961,126 +1333,268 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, return (-EPROTO); } - if (rc < requested_nob) - handle_short_read(rc, page_count, pga); + if (rc < aa->aa_requested_nob) + handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); -#if CHECKSUM_BULK - if (oa->o_valid & OBD_MD_FLCKSUM) { - const struct ptlrpc_peer *peer = - &req->rq_import->imp_connection->c_peer; + if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count, + aa->aa_ppga)) + GOTO(out, rc = -EAGAIN); + + if (body->oa.o_valid & OBD_MD_FLCKSUM) { static int cksum_counter; - obd_count server_cksum = oa->o_cksum; - obd_count cksum = cksum_pages(rc, page_count, pga); - char str[PTL_NALFMT_SIZE]; + __u32 server_cksum = body->oa.o_cksum; + char *via; + char *router; + cksum_type_t cksum_type; - ptlrpc_peernid2str(peer, str); + if (body->oa.o_valid & OBD_MD_FLFLAGS) + cksum_type = cksum_type_unpack(body->oa.o_flags); + else + cksum_type = OBD_CKSUM_CRC32; + client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, + aa->aa_ppga, OST_READ, + cksum_type); - cksum_counter++; - if (server_cksum != cksum) { - CERROR("Bad checksum: server %x, client %x, server NID " - LPX64" (%s)\n", server_cksum, cksum, - peer->peer_id.nid, str); + if (peer->nid == req->rq_bulk->bd_sender) { + via = router = ""; + } else { + via = " via "; + router = libcfs_nid2str(req->rq_bulk->bd_sender); + } + + if (server_cksum == ~0 && rc > 0) { + CERROR("Protocol error: server %s set the 'checksum' " + "bit, but didn't send a checksum. Not fatal, " + "but please notify on http://bugzilla.lustre.org/\n", + libcfs_nid2str(peer->nid)); + } else if (server_cksum != client_cksum) { + LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " + "%s%s%s inum "LPU64"/"LPU64" object " + LPU64"/"LPU64" extent " + "["LPU64"-"LPU64"]\n", + req->rq_import->imp_obd->obd_name, + libcfs_nid2str(peer->nid), + via, router, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_fid : (__u64)0, + body->oa.o_valid & OBD_MD_FLFID ? + body->oa.o_generation :(__u64)0, + body->oa.o_id, + body->oa.o_valid & OBD_MD_FLGROUP ? + body->oa.o_gr : (__u64)0, + aa->aa_ppga[0]->off, + aa->aa_ppga[aa->aa_page_count-1]->off + + aa->aa_ppga[aa->aa_page_count-1]->count - + 1); + CERROR("client %x, server %x, cksum_type %x\n", + client_cksum, server_cksum, cksum_type); cksum_counter = 0; - oa->o_cksum = cksum; - } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){ - CWARN("Checksum %u from "LPX64" (%s) OK: %x\n", - cksum_counter, peer->peer_id.nid, str, cksum); + aa->aa_oa->o_cksum = client_cksum; + rc = -EAGAIN; + } else { + cksum_counter++; + CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); + rc = 0; } - } else { + } else if (unlikely(client_cksum)) { static int cksum_missed; cksum_missed++; if ((cksum_missed & (-cksum_missed)) == cksum_missed) - CERROR("Request checksum %u from "LPX64", no reply\n", - cksum_missed, - req->rq_import->imp_connection->c_peer.peer_id.nid); + CERROR("Checksum %u requested from %s but not sent\n", + cksum_missed, libcfs_nid2str(peer->nid)); + } else { + rc = 0; } -#endif - osc_decrypt_page(pga->pg, pga->page_offset, pga->count); - RETURN(0); +out: + if (rc >= 0) + *aa->aa_oa = body->oa; + + RETURN(rc); } -static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa, +static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, - obd_count page_count, struct brw_page *pga) + obd_count page_count, struct brw_page **pga, + struct obd_capa *ocapa) { - int requested_nob; - int niocount; - struct ptlrpc_request *request; + struct ptlrpc_request *req; int rc; + cfs_waitq_t waitq; + int resends = 0; + struct l_wait_info lwi; + ENTRY; + cfs_waitq_init(&waitq); + restart_bulk: - rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm, - page_count, pga, &requested_nob, &niocount, - &request); + rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, + page_count, pga, &req, ocapa); if (rc != 0) return (rc); - rc = ptlrpc_queue_wait(request); + rc = ptlrpc_queue_wait(req); - if (rc == -ETIMEDOUT && request->rq_resend) { - DEBUG_REQ(D_HA, request, "BULK TIMEOUT"); - ptlrpc_req_finished(request); + if (rc == -ETIMEDOUT && req->rq_resend) { + DEBUG_REQ(D_HA, req, "BULK TIMEOUT"); + ptlrpc_req_finished(req); goto restart_bulk; } - rc = osc_brw_fini_request(request, oa, requested_nob, niocount, - page_count, pga, rc); + rc = osc_brw_fini_request(req, rc); + + ptlrpc_req_finished(req); + if (osc_recoverable_error(rc)) { + resends++; + if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("too many resend retries, returning error\n"); + RETURN(-EIO); + } + + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL); + l_wait_event(waitq, 0, &lwi); + + goto restart_bulk; + } - ptlrpc_req_finished(request); RETURN (rc); } -static int brw_interpret(struct ptlrpc_request *request, - struct osc_brw_async_args *aa, int rc) +int osc_brw_redo_request(struct ptlrpc_request *request, + struct osc_brw_async_args *aa) { - struct obdo *oa = aa->aa_oa; - int requested_nob = aa->aa_requested_nob; - int niocount = aa->aa_nio_count; - obd_count page_count = aa->aa_page_count; - struct brw_page *pga = aa->aa_pga; + struct ptlrpc_request *new_req; + struct ptlrpc_request_set *set = request->rq_set; + struct osc_brw_async_args *new_aa; + struct osc_async_page *oap; + int rc = 0; ENTRY; - rc = osc_brw_fini_request(request, oa, requested_nob, niocount, - page_count, pga, rc); - RETURN (rc); + if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) { + CERROR("too many resend retries, returning error\n"); + RETURN(-EIO); + } + + DEBUG_REQ(D_ERROR, request, "redo for recoverable error"); +/* + body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + if (body->oa.o_valid & OBD_MD_FLOSSCAPA) + ocapa = lustre_unpack_capa(request->rq_reqmsg, + REQ_REC_OFF + 3); +*/ + rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == + OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, + aa->aa_cli, aa->aa_oa, + NULL /* lsm unused by osc currently */, + aa->aa_page_count, aa->aa_ppga, + &new_req, NULL /* ocapa */); + if (rc) + RETURN(rc); + + client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock); + + list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { + if (oap->oap_request != NULL) { + LASSERTF(request == oap->oap_request, + "request %p != oap_request %p\n", + request, oap->oap_request); + if (oap->oap_interrupted) { + client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); + ptlrpc_req_finished(new_req); + RETURN(-EINTR); + } + } + } + /* New request takes over pga and oaps from old request. + * Note that copying a list_head doesn't work, need to move it... */ + aa->aa_resends++; + new_req->rq_interpret_reply = request->rq_interpret_reply; + new_req->rq_async_args = request->rq_async_args; + new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends; + + new_aa = ptlrpc_req_async_args(new_req); + + CFS_INIT_LIST_HEAD(&new_aa->aa_oaps); + list_splice(&aa->aa_oaps, &new_aa->aa_oaps); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); + + list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { + if (oap->oap_request) { + ptlrpc_req_finished(oap->oap_request); + oap->oap_request = ptlrpc_request_addref(new_req); + } + } + + /* use ptlrpc_set_add_req is safe because interpret functions work + * in check_set context. only one way exist with access to request + * from different thread got -EINTR - this way protected with + * cl_loi_list_lock */ + ptlrpc_set_add_req(set, new_req); + + client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); + + DEBUG_REQ(D_INFO, new_req, "new request"); + RETURN(0); } static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page *pga, struct ptlrpc_request_set *set) + struct brw_page **pga, struct ptlrpc_request_set *set, + struct obd_capa *ocapa) { - struct ptlrpc_request *request; - int requested_nob; - int nio_count; + struct ptlrpc_request *req; + struct client_obd *cli = &exp->exp_obd->u.cli; + int rc, i; struct osc_brw_async_args *aa; - int rc; ENTRY; - rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm, - page_count, pga, &requested_nob, &nio_count, - &request); - if (rc == 0) { - LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args)); - aa = (struct osc_brw_async_args *)&request->rq_async_args; - aa->aa_oa = oa; - aa->aa_requested_nob = requested_nob; - aa->aa_nio_count = nio_count; - aa->aa_page_count = page_count; - aa->aa_pga = pga; + /* Consume write credits even if doing a sync write - + * otherwise we may run out of space on OST due to grant. */ + if (cmd == OBD_BRW_WRITE) { + spin_lock(&cli->cl_loi_list_lock); + for (i = 0; i < page_count; i++) { + if (cli->cl_avail_grant >= CFS_PAGE_SIZE) + osc_consume_write_grant(cli, pga[i]); + } + spin_unlock(&cli->cl_loi_list_lock); + } + + rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga, + &req, ocapa); - request->rq_interpret_reply = brw_interpret; - ptlrpc_set_add_req(set, request); + aa = ptlrpc_req_async_args(req); + if (cmd == OBD_BRW_READ) { + lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); + } else { + lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_write_rpc_hist, + cli->cl_w_in_flight); + } + ptlrpc_lprocfs_brw(req, aa->aa_requested_nob); + + LASSERT(list_empty(&aa->aa_oaps)); + if (rc == 0) { + req->rq_interpret_reply = brw_interpret; + ptlrpc_set_add_req(set, req); + client_obd_list_lock(&cli->cl_loi_list_lock); + if (cmd == OBD_BRW_READ) + cli->cl_r_in_flight++; + else + cli->cl_w_in_flight++; + client_obd_list_unlock(&cli->cl_loi_list_lock); + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3); + } else if (cmd == OBD_BRW_WRITE) { + client_obd_list_lock(&cli->cl_loi_list_lock); + for (i = 0; i < page_count; i++) + osc_release_write_grant(cli, pga[i], 0); + osc_wake_cache_waiters(cli); + client_obd_list_unlock(&cli->cl_loi_list_lock); } RETURN (rc); } -#ifndef min_t -#define min_t(type,x,y) \ - ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; }) -#endif - /* * ugh, we want disk allocation on the target to happen in offset order. we'll * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do @@ -1088,10 +1602,10 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, * insertion sort that swaps elements that are strides apart, shrinking the * stride down until its '1' and the array is sorted. */ -static void sort_brw_pages(struct brw_page *array, int num) +static void sort_brw_pages(struct brw_page **array, int num) { int stride, i, j; - struct brw_page tmp; + struct brw_page *tmp; if (num == 1) return; @@ -1103,8 +1617,7 @@ static void sort_brw_pages(struct brw_page *array, int num) for (i = stride ; i < num ; i++) { tmp = array[i]; j = i; - while (j >= stride && array[j - stride].disk_offset > - tmp.disk_offset) { + while (j >= stride && array[j - stride]->off > tmp->off) { array[j] = array[j - stride]; j -= stride; } @@ -1113,114 +1626,285 @@ static void sort_brw_pages(struct brw_page *array, int num) } while (stride > 1); } -/* make sure we the regions we're passing to elan don't violate its '4 - * fragments' constraint. portal headers are a fragment, all full - * PAGE_SIZE long pages count as 1 fragment, and each partial page - * counts as a fragment. I think. see bug 934. */ -static obd_count check_elan_limit(struct brw_page *pg, obd_count pages) +static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages) { - int frags_left = 3; - int saw_whole_frag = 0; - int i; + int count = 1; + int offset; + int i = 0; - for (i = 0 ; frags_left && i < pages ; pg++, i++) { - if (pg->count == PAGE_SIZE) { - if (!saw_whole_frag) { - saw_whole_frag = 1; - frags_left--; - } - } else { - frags_left--; - } + LASSERT (pages > 0); + offset = pg[i]->off & ~CFS_PAGE_MASK; + + for (;;) { + pages--; + if (pages == 0) /* that's all */ + return count; + + if (offset + pg[i]->count < CFS_PAGE_SIZE) + return count; /* doesn't end on page boundary */ + + i++; + offset = pg[i]->off & ~CFS_PAGE_MASK; + if (offset != 0) /* doesn't start on page boundary */ + return count; + + count++; } - return i; } -static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page *pga, struct obd_trans_info *oti) +static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count) { + struct brw_page **ppga; + int i; + + OBD_ALLOC(ppga, sizeof(*ppga) * count); + if (ppga == NULL) + return NULL; + + for (i = 0; i < count; i++) + ppga[i] = pga + i; + return ppga; +} + +static void osc_release_ppga(struct brw_page **ppga, obd_count count) +{ + LASSERT(ppga != NULL); + OBD_FREE(ppga, sizeof(*ppga) * count); +} + +static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, + obd_count page_count, struct brw_page *pga, + struct obd_trans_info *oti) +{ + struct obdo *saved_oa = NULL; + struct brw_page **ppga, **orig; + struct obd_import *imp = class_exp2cliimp(exp); + struct client_obd *cli = &imp->imp_obd->u.cli; + int rc, page_count_orig; ENTRY; - if (cmd == OBD_BRW_CHECK) { + if (cmd & OBD_BRW_CHECK) { /* The caller just wants to know if there's a chance that this * I/O can succeed */ - struct obd_import *imp = class_exp2cliimp(exp); if (imp == NULL || imp->imp_invalid) RETURN(-EIO); RETURN(0); } + /* test_brw with a failed create can trip this, maybe others. */ + LASSERT(cli->cl_max_pages_per_rpc); + + rc = 0; + + orig = ppga = osc_build_ppga(pga, page_count); + if (ppga == NULL) + RETURN(-ENOMEM); + page_count_orig = page_count; + + sort_brw_pages(ppga, page_count); while (page_count) { obd_count pages_per_brw; - int rc; - if (page_count > PTLRPC_MAX_BRW_PAGES) - pages_per_brw = PTLRPC_MAX_BRW_PAGES; + if (page_count > cli->cl_max_pages_per_rpc) + pages_per_brw = cli->cl_max_pages_per_rpc; else pages_per_brw = page_count; - sort_brw_pages(pga, pages_per_brw); - pages_per_brw = check_elan_limit(pga, pages_per_brw); + pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw); + + if (saved_oa != NULL) { + /* restore previously saved oa */ + *oinfo->oi_oa = *saved_oa; + } else if (page_count > pages_per_brw) { + /* save a copy of oa (brw will clobber it) */ + OBDO_ALLOC(saved_oa); + if (saved_oa == NULL) + GOTO(out, rc = -ENOMEM); + *saved_oa = *oinfo->oi_oa; + } - rc = osc_brw_internal(cmd, exp, oa, lsm, pages_per_brw, pga); + rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md, + pages_per_brw, ppga, oinfo->oi_capa); if (rc != 0) - RETURN(rc); + break; page_count -= pages_per_brw; - pga += pages_per_brw; + ppga += pages_per_brw; } - RETURN(0); + +out: + osc_release_ppga(orig, page_count_orig); + + if (saved_oa != NULL) + OBDO_FREE(saved_oa); + + RETURN(rc); } -static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page *pga, struct ptlrpc_request_set *set, - struct obd_trans_info *oti) +static int osc_brw_async(int cmd, struct obd_export *exp, + struct obd_info *oinfo, obd_count page_count, + struct brw_page *pga, struct obd_trans_info *oti, + struct ptlrpc_request_set *set) { + struct brw_page **ppga, **orig; + struct client_obd *cli = &exp->exp_obd->u.cli; + int page_count_orig; + int rc = 0; ENTRY; - if (cmd == OBD_BRW_CHECK) { + if (cmd & OBD_BRW_CHECK) { + struct obd_import *imp = class_exp2cliimp(exp); /* The caller just wants to know if there's a chance that this * I/O can succeed */ - struct obd_import *imp = class_exp2cliimp(exp); if (imp == NULL || imp->imp_invalid) RETURN(-EIO); RETURN(0); } + orig = ppga = osc_build_ppga(pga, page_count); + if (ppga == NULL) + RETURN(-ENOMEM); + page_count_orig = page_count; + + sort_brw_pages(ppga, page_count); while (page_count) { + struct brw_page **copy; obd_count pages_per_brw; - int rc; - if (page_count > PTLRPC_MAX_BRW_PAGES) - pages_per_brw = PTLRPC_MAX_BRW_PAGES; - else - pages_per_brw = page_count; + pages_per_brw = min_t(obd_count, page_count, + cli->cl_max_pages_per_rpc); - sort_brw_pages(pga, pages_per_brw); - pages_per_brw = check_elan_limit(pga, pages_per_brw); + pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw); - rc = async_internal(cmd, exp, oa, lsm, pages_per_brw, pga, set); + /* use ppga only if single RPC is going to fly */ + if (pages_per_brw != page_count_orig || ppga != orig) { + OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw); + if (copy == NULL) + GOTO(out, rc = -ENOMEM); + memcpy(copy, ppga, sizeof(*copy) * pages_per_brw); + } else + copy = ppga; - if (rc != 0) - RETURN(rc); + rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md, + pages_per_brw, copy, set, oinfo->oi_capa); + + if (rc != 0) { + if (copy != ppga) + OBD_FREE(copy, sizeof(*copy) * pages_per_brw); + break; + } + if (copy == orig) { + /* we passed it to async_internal() which is + * now responsible for releasing memory */ + orig = NULL; + } page_count -= pages_per_brw; - pga += pages_per_brw; + ppga += pages_per_brw; } - RETURN(0); +out: + if (orig) + osc_release_ppga(orig, page_count_orig); + RETURN(rc); } static void osc_check_rpcs(struct client_obd *cli); + +/* The companion to osc_enter_cache(), called when @oap is no longer part of + * the dirty accounting. Writeback completes or truncate happens before + * writing starts. Must be called with the loi lock held. */ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, - int sent); -static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi); + int sent) +{ + osc_release_write_grant(cli, &oap->oap_brw_page, sent); +} + + +/* This maintains the lists of pending pages to read/write for a given object + * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint() + * to quickly find objects that are ready to send an RPC. */ +static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, + int cmd) +{ + int optimal; + ENTRY; + + if (lop->lop_num_pending == 0) + RETURN(0); + + /* if we have an invalid import we want to drain the queued pages + * by forcing them through rpcs that immediately fail and complete + * the pages. recovery relies on this to empty the queued pages + * before canceling the locks and evicting down the llite pages */ + if (cli->cl_import == NULL || cli->cl_import->imp_invalid) + RETURN(1); + + /* stream rpcs in queue order as long as as there is an urgent page + * queued. this is our cheap solution for good batching in the case + * where writepage marks some random page in the middle of the file + * as urgent because of, say, memory pressure */ + if (!list_empty(&lop->lop_urgent)) { + CDEBUG(D_CACHE, "urgent request forcing RPC\n"); + RETURN(1); + } + /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */ + optimal = cli->cl_max_pages_per_rpc; + if (cmd & OBD_BRW_WRITE) { + /* trigger a write rpc stream as long as there are dirtiers + * waiting for space. as they're waiting, they're not going to + * create more pages to coallesce with what's waiting.. */ + if (!list_empty(&cli->cl_cache_waiters)) { + CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); + RETURN(1); + } + /* +16 to avoid triggering rpcs that would want to include pages + * that are being queued but which can't be made ready until + * the queuer finishes with the page. this is a wart for + * llite::commit_write() */ + optimal += 16; + } + if (lop->lop_num_pending >= optimal) + RETURN(1); + + RETURN(0); +} + +static void on_list(struct list_head *item, struct list_head *list, + int should_be_on) +{ + if (list_empty(item) && should_be_on) + list_add_tail(item, list); + else if (!list_empty(item) && !should_be_on) + list_del_init(item); +} + +/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc + * can find pages to build into rpcs quickly */ +static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) +{ + on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, + lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) || + lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); + + on_list(&loi->loi_write_item, &cli->cl_loi_write_list, + loi->loi_write_lop.lop_num_pending); + + on_list(&loi->loi_read_item, &cli->cl_loi_read_list, + loi->loi_read_lop.lop_num_pending); +} + static void lop_update_pending(struct client_obd *cli, - struct loi_oap_pages *lop, int cmd, int delta); + struct loi_oap_pages *lop, int cmd, int delta) +{ + lop->lop_num_pending += delta; + if (cmd & OBD_BRW_WRITE) + cli->cl_pending_w_pages += delta; + else + cli->cl_pending_r_pages += delta; +} /* this is called when a sync waiter receives an interruption. Its job is to * get the caller woken as soon as possible. If its page hasn't been put in an @@ -1237,36 +1921,72 @@ static void osc_occ_interrupted(struct oig_callback_context *occ) /* XXX member_of() */ oap = list_entry(occ, struct osc_async_page, oap_occ); - spin_lock(&oap->oap_cli->cl_loi_list_lock); + client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock); oap->oap_interrupted = 1; - /* ok, it's been put in an rpc. */ + /* ok, it's been put in an rpc. only one oap gets a request reference */ if (oap->oap_request != NULL) { ptlrpc_mark_interrupted(oap->oap_request); ptlrpcd_wake(oap->oap_request); GOTO(unlock, 0); } - /* we don't get interruption callbacks until osc_trigger_sync_io() + /* we don't get interruption callbacks until osc_trigger_group_io() * has been called and put the sync oaps in the pending/urgent lists.*/ if (!list_empty(&oap->oap_pending_item)) { list_del_init(&oap->oap_pending_item); - if (oap->oap_async_flags & ASYNC_URGENT) - list_del_init(&oap->oap_urgent_item); + list_del_init(&oap->oap_urgent_item); loi = oap->oap_loi; - lop = (oap->oap_cmd == OBD_BRW_WRITE) ? + lop = (oap->oap_cmd & OBD_BRW_WRITE) ? &loi->loi_write_lop : &loi->loi_read_lop; lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1); loi_list_maint(oap->oap_cli, oap->oap_loi); - oig_complete_one(oap->oap_oig, &oap->oap_occ, 0); + oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR); oap->oap_oig = NULL; } unlock: - spin_unlock(&oap->oap_cli->cl_loi_list_lock); + client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock); +} + +/* this is trying to propogate async writeback errors back up to the + * application. As an async write fails we record the error code for later if + * the app does an fsync. As long as errors persist we force future rpcs to be + * sync so that the app can get a sync error and break the cycle of queueing + * pages for which writeback will fail. */ +static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, + int rc) +{ + if (rc) { + if (!ar->ar_rc) + ar->ar_rc = rc; + + ar->ar_force_sync = 1; + ar->ar_min_xid = ptlrpc_sample_next_xid(); + return; + + } + + if (ar->ar_force_sync && (xid >= ar->ar_min_xid)) + ar->ar_force_sync = 0; +} + +static void osc_oap_to_pending(struct osc_async_page *oap) +{ + struct loi_oap_pages *lop; + + if (oap->oap_cmd & OBD_BRW_WRITE) + lop = &oap->oap_loi->loi_write_lop; + else + lop = &oap->oap_loi->loi_read_lop; + + if (oap->oap_async_flags & ASYNC_URGENT) + list_add(&oap->oap_urgent_item, &lop->lop_urgent); + list_add_tail(&oap->oap_pending_item, &lop->lop_pending); + lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1); } /* this must be called holding the loi list lock to give coverage to exit_cache, @@ -1274,99 +1994,103 @@ unlock: static void osc_ap_completion(struct client_obd *cli, struct obdo *oa, struct osc_async_page *oap, int sent, int rc) { - osc_exit_cache(cli, oap, sent); - oap->oap_async_flags = 0; - oap->oap_interrupted = 0; + __u64 xid = 0; + ENTRY; if (oap->oap_request != NULL) { + xid = ptlrpc_req_xid(oap->oap_request); ptlrpc_req_finished(oap->oap_request); oap->oap_request = NULL; } - if (rc == 0 && oa != NULL) - oap->oap_loi->loi_blocks = oa->o_blocks; + oap->oap_async_flags = 0; + oap->oap_interrupted = 0; + + if (oap->oap_cmd & OBD_BRW_WRITE) { + osc_process_ar(&cli->cl_ar, xid, rc); + osc_process_ar(&oap->oap_loi->loi_ar, xid, rc); + } + + if (rc == 0 && oa != NULL) { + if (oa->o_valid & OBD_MD_FLBLOCKS) + oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks; + if (oa->o_valid & OBD_MD_FLMTIME) + oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime; + if (oa->o_valid & OBD_MD_FLATIME) + oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime; + if (oa->o_valid & OBD_MD_FLCTIME) + oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime; + } if (oap->oap_oig) { + osc_exit_cache(cli, oap, sent); oig_complete_one(oap->oap_oig, &oap->oap_occ, rc); oap->oap_oig = NULL; EXIT; return; } - oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd, - oa, rc); + rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data, + oap->oap_cmd, oa, rc); + + /* ll_ap_completion (from llite) drops PG_locked. so, a new + * I/O on the page could start, but OSC calls it under lock + * and thus we can add oap back to pending safely */ + if (rc) + /* upper layer wants to leave the page on pending queue */ + osc_oap_to_pending(oap); + else + osc_exit_cache(cli, oap, sent); + EXIT; } -static int brw_interpret_oap(struct ptlrpc_request *request, - struct osc_brw_async_args *aa, int rc) +static int brw_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, int rc) { - struct osc_async_page *oap; + struct osc_brw_async_args *aa = data; struct client_obd *cli; - struct list_head *pos, *n; - struct timeval now; ENTRY; - do_gettimeofday(&now); - rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob, - aa->aa_nio_count, aa->aa_page_count, - aa->aa_pga, rc); - - CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc); - - cli = aa->aa_cli; - /* in failout recovery we ignore writeback failure and want - * to just tell llite to unlock the page and continue */ - if (request->rq_reqmsg->opc == OST_WRITE && - (cli->cl_import == NULL || cli->cl_import->imp_invalid)) { - CDEBUG(D_INODE, "flipping to rc 0 imp %p inv %d\n", - cli->cl_import, - cli->cl_import ? cli->cl_import->imp_invalid : -1); - rc = 0; + rc = osc_brw_fini_request(req, rc); + CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); + if (osc_recoverable_error(rc)) { + rc = osc_brw_redo_request(req, aa); + if (rc == 0) + RETURN(0); } - spin_lock(&cli->cl_loi_list_lock); - - if (request->rq_reqmsg->opc == OST_WRITE) - lprocfs_stime_record(&cli->cl_write_stime, &now, - &request->rq_rpcd_start); - else - lprocfs_stime_record(&cli->cl_read_stime, &now, - &request->rq_rpcd_start); - + cli = aa->aa_cli; + client_obd_list_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters * is called so we know whether to go to sync BRWs or wait for more * RPCs to complete */ - if (request->rq_reqmsg->opc == OST_WRITE) + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) cli->cl_w_in_flight--; else cli->cl_r_in_flight--; - /* the caller may re-use the oap after the completion call so - * we need to clean it up a little */ - list_for_each_safe(pos, n, &aa->aa_oaps) { - oap = list_entry(pos, struct osc_async_page, oap_rpc_item); - - //CDEBUG(D_INODE, "page %p index %lu oap %p\n", - //oap->oap_page, oap->oap_page->index, oap); - - list_del_init(&oap->oap_rpc_item); - osc_ap_completion(cli, aa->aa_oa, oap, 1, rc); + if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */ + struct osc_async_page *oap, *tmp; + /* the caller may re-use the oap after the completion call so + * we need to clean it up a little */ + list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) { + list_del_init(&oap->oap_rpc_item); + osc_ap_completion(cli, aa->aa_oa, oap, 1, rc); + } + OBDO_FREE(aa->aa_oa); + } else { /* from async_internal() */ + int i; + for (i = 0; i < aa->aa_page_count; i++) + osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); } - - /* no write RPCs in flight, reset the time */ - if (request->rq_reqmsg->opc == OST_WRITE && cli->cl_w_in_flight == 0) - do_gettimeofday(&cli->cl_last_write_time); - osc_wake_cache_waiters(cli); osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); - obdo_free(aa->aa_oa); - OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page)); - - RETURN(0); + osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + RETURN(rc); } static struct ptlrpc_request *osc_build_req(struct client_obd *cli, @@ -1374,118 +2098,124 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, int page_count, int cmd) { struct ptlrpc_request *req; - struct brw_page *pga = NULL; - int requested_nob, nio_count; + struct brw_page **pga = NULL; struct osc_brw_async_args *aa; struct obdo *oa = NULL; struct obd_async_page_ops *ops = NULL; void *caller_data = NULL; - struct list_head *pos; + struct obd_capa *ocapa; + struct osc_async_page *oap; + struct ldlm_lock *lock = NULL; int i, rc; + ENTRY; LASSERT(!list_empty(rpc_list)); OBD_ALLOC(pga, sizeof(*pga) * page_count); if (pga == NULL) RETURN(ERR_PTR(-ENOMEM)); - oa = obdo_alloc(); + OBDO_ALLOC(oa); if (oa == NULL) GOTO(out, req = ERR_PTR(-ENOMEM)); i = 0; - list_for_each(pos, rpc_list) { - struct osc_async_page *oap; - - oap = list_entry(pos, struct osc_async_page, oap_rpc_item); + list_for_each_entry(oap, rpc_list, oap_rpc_item) { if (ops == NULL) { ops = oap->oap_caller_ops; caller_data = oap->oap_caller_data; + lock = oap->oap_ldlm_lock; } - pga[i].disk_offset = oap->oap_obj_off + oap->oap_page_off; - pga[i].page_offset = pga[i].disk_offset; - pga[i].pg = oap->oap_page; - pga[i].count = oap->oap_count; - pga[i].flag = oap->oap_brw_flags; + pga[i] = &oap->oap_brw_page; + pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", - pga[i].pg, oap->oap_page->index, oap, pga[i].flag); + pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag); i++; } /* always get the data for the obdo for the rpc */ LASSERT(ops != NULL); ops->ap_fill_obdo(caller_data, cmd, oa); + ocapa = ops->ap_lookup_capa(caller_data, cmd); + if (lock) { + oa->o_handle = lock->l_remote_handle; + oa->o_valid |= OBD_MD_FLHANDLE; + } sort_brw_pages(pga, page_count); - rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count, - pga, &requested_nob, &nio_count, &req); + rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, + pga, &req, ocapa); + capa_put(ocapa); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); GOTO(out, req = ERR_PTR(rc)); } - LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_brw_async_args *)&req->rq_async_args; - aa->aa_oa = oa; - aa->aa_requested_nob = requested_nob; - aa->aa_nio_count = nio_count; - aa->aa_page_count = page_count; - aa->aa_pga = pga; - aa->aa_cli = cli; + /* Need to update the timestamps after the request is built in case + * we race with setattr (locally or in queue at OST). If OST gets + * later setattr before earlier BRW (as determined by the request xid), + * the OST will not use BRW timestamps. Sadly, there is no obvious + * way to do this in a single call. bug 10150 */ + ops->ap_update_obdo(caller_data, cmd, oa, + OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME); + + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); + list_splice(rpc_list, &aa->aa_oaps); + CFS_INIT_LIST_HEAD(rpc_list); out: if (IS_ERR(req)) { if (oa) - obdo_free(oa); + OBDO_FREE(oa); if (pga) OBD_FREE(pga, sizeof(*pga) * page_count); } RETURN(req); } -/* strange write gap too long (15s) */ -#define CLI_ODD_WRITE_GAP 15000000 - -static void lop_update_pending(struct client_obd *cli, - struct loi_oap_pages *lop, int cmd, int delta) -{ - lop->lop_num_pending += delta; - if (cmd == OBD_BRW_WRITE) - cli->cl_pending_w_pages += delta; - else - cli->cl_pending_r_pages += delta; -} - -static long timeval_sub(struct timeval *large, struct timeval *small) -{ - return (large->tv_sec - small->tv_sec) * 1000000 + - (large->tv_usec - small->tv_usec); -} - /* the loi lock is held across this function but it's allowed to release * and reacquire it during its work */ +/** + * prepare pages for ASYNC io and put pages in send queue. + * + * \param cli - + * \param loi - + * \param cmd - OBD_BRW_* macroses + * \param lop - pending pages + * + * \return zero if pages successfully add to send queue. + * \return not zere if error occurring. + */ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, int cmd, struct loi_oap_pages *lop) { - struct ptlrpc_request *request; + struct ptlrpc_request *req; obd_count page_count = 0; - struct list_head *tmp, *pos; - struct osc_async_page *oap = NULL; + struct osc_async_page *oap = NULL, *tmp; struct osc_brw_async_args *aa; struct obd_async_page_ops *ops; - LIST_HEAD(rpc_list); + CFS_LIST_HEAD(rpc_list); + unsigned int ending_offset; + unsigned starting_offset = 0; + int srvlock = 0; ENTRY; - LASSERT(lop != LP_POISON); - LASSERT(lop->lop_pending.next != LP_POISON); - /* first we find the pages we're allowed to work with */ - list_for_each_safe(pos, tmp, &lop->lop_pending) { - oap = list_entry(pos, struct osc_async_page, oap_pending_item); + list_for_each_entry_safe(oap, tmp, &lop->lop_pending, + oap_pending_item) { ops = oap->oap_caller_ops; LASSERT(oap->oap_magic == OAP_MAGIC); + if (page_count != 0 && + srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) { + CDEBUG(D_PAGE, "SRVLOCK flag mismatch," + " oap %p, page %p, srvlock %u\n", + oap, oap->oap_brw_page.pg, (unsigned)!srvlock); + break; + } /* in llite being 'ready' equates to the page being locked * until completion unlocks it. commit_write submits a page * as not ready because its unlock will happen unconditionally @@ -1509,7 +2239,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, * break out of the loop so we don't create * a hole in the sequence of pages in the rpc * stream.*/ - pos = NULL; + oap = NULL; break; case -EINTR: /* the io isn't needed.. tell the checks @@ -1527,22 +2257,35 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, break; } } - if (pos == NULL) + if (oap == NULL) break; /* * Page submitted for IO has to be locked. Either by * ->ap_make_ready() or by higher layers. - * - * XXX nikita: this assertion should be adjusted when lustre - * starts using PG_writeback for pages being written out. */ - LASSERT(PageLocked(oap->oap_page)); +#if defined(__KERNEL__) && defined(__linux__) + if(!(PageLocked(oap->oap_page) && + (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) { + CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n", + oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags); + LBUG(); + } +#endif + /* If there is a gap at the start of this page, it can't merge + * with any previous page, so we'll hand the network a + * "fragmented" page array that it can't transfer in 1 RDMA */ + if (page_count != 0 && oap->oap_page_off != 0) + break; /* take the page out of our book-keeping */ list_del_init(&oap->oap_pending_item); lop_update_pending(cli, lop, cmd, -1); list_del_init(&oap->oap_urgent_item); + if (page_count == 0) + starting_offset = (oap->oap_obj_off+oap->oap_page_off) & + (PTLRPC_MAX_BRW_SIZE - 1); + /* ask the caller for the size of the io as the rpc leaves. */ if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) oap->oap_count = @@ -1556,8 +2299,25 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, /* now put the page back in our accounting */ list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (page_count == 0) + srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); if (++page_count >= cli->cl_max_pages_per_rpc) break; + + /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized + * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads + * have the same alignment as the initial writes that allocated + * extents on the server. */ + ending_offset = (oap->oap_obj_off + oap->oap_page_off + + oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1); + if (ending_offset == 0) + break; + + /* If there is a gap at the end of this page, it can't merge + * with any subsequent pages, so we'll hand the network a + * "fragmented" page array that it can't transfer in 1 RDMA */ + if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE) + break; } osc_wake_cache_waiters(cli); @@ -1566,171 +2326,76 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, RETURN(0); loi_list_maint(cli, loi); - spin_unlock(&cli->cl_loi_list_lock); - request = osc_build_req(cli, &rpc_list, page_count, cmd); - if (IS_ERR(request)) { + client_obd_list_unlock(&cli->cl_loi_list_lock); + + req = osc_build_req(cli, &rpc_list, page_count, cmd); + if (IS_ERR(req)) { /* this should happen rarely and is pretty bad, it makes the * pending list not follow the dirty order */ - spin_lock(&cli->cl_loi_list_lock); - list_for_each_safe(pos, tmp, &rpc_list) { - oap = list_entry(pos, struct osc_async_page, - oap_rpc_item); - list_del_init(&oap->oap_rpc_item); - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - if (oap->oap_interrupted) { - CDEBUG(D_INODE, "oap %p interrupted\n", oap); - osc_ap_completion(cli, NULL, oap, 0, - oap->oap_count); - continue; - } - - /* put the page back in the loi/lop lists */ - list_add_tail(&oap->oap_pending_item, - &lop->lop_pending); - lop_update_pending(cli, lop, cmd, 1); - if (oap->oap_async_flags & ASYNC_URGENT) - list_add(&oap->oap_urgent_item, - &lop->lop_urgent); - } - loi_list_maint(cli, loi); - RETURN(PTR_ERR(request)); - } - - LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args)); - aa = (struct osc_brw_async_args *)&request->rq_async_args; - INIT_LIST_HEAD(&aa->aa_oaps); - list_splice(&rpc_list, &aa->aa_oaps); - INIT_LIST_HEAD(&rpc_list); - -#ifdef __KERNEL__ - if (cmd == OBD_BRW_READ) { - lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); - } else { - lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_write_rpc_hist, - cli->cl_w_in_flight); - } -#endif - - spin_lock(&cli->cl_loi_list_lock); - - /* collect write gaps and sum of them */ - if (cmd == OBD_BRW_WRITE && cli->cl_w_in_flight == 0) { - struct timeval now; - long diff; - - do_gettimeofday(&now); - - if (cli->cl_last_write_time.tv_sec) { - diff = timeval_sub(&now, &cli->cl_last_write_time); - if (diff < CLI_ODD_WRITE_GAP) { - cli->cl_write_gap_sum += diff; - cli->cl_write_gaps++; - } - } - } - - if (cmd == OBD_BRW_READ) { - cli->cl_r_in_flight++; - cli->cl_read_num++; - } else { - cli->cl_w_in_flight++; - cli->cl_write_num++; - } - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - list_for_each(pos, &aa->aa_oaps) { - oap = list_entry(pos, struct osc_async_page, oap_rpc_item); - if (oap->oap_interrupted) { - CDEBUG(D_INODE, "oap %p in req %p interrupted\n", - oap, request); - ptlrpc_mark_interrupted(request); - break; - } - } - - CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %dr/%dw in flight\n", - request, page_count, aa, cli->cl_r_in_flight, - cli->cl_w_in_flight); - - oap->oap_request = ptlrpc_request_addref(request); - request->rq_interpret_reply = brw_interpret_oap; - - ptlrpcd_add_req(request); - RETURN(1); -} - -static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, - int cmd) -{ - int optimal; - ENTRY; - - if (lop->lop_num_pending == 0) - RETURN(0); - - /* if we have an invalid import we want to drain the queued pages - * by forcing them through rpcs that immediately fail and complete - * the pages. recovery relies on this to empty the queued pages - * before canceling the locks and evicting down the llite pages */ - if (cli->cl_import == NULL || cli->cl_import->imp_invalid) - RETURN(1); + client_obd_list_lock(&cli->cl_loi_list_lock); + list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) { + list_del_init(&oap->oap_rpc_item); - /* stream rpcs in queue order as long as as there is an urgent page - * queued. this is our cheap solution for good batching in the case - * where writepage marks some random page in the middle of the file as - * urgent because of, say, memory pressure */ - if (!list_empty(&lop->lop_urgent)) - RETURN(1); + /* queued sync pages can be torn down while the pages + * were between the pending list and the rpc */ + if (oap->oap_interrupted) { + CDEBUG(D_INODE, "oap %p interrupted\n", oap); + osc_ap_completion(cli, NULL, oap, 0, + oap->oap_count); + continue; + } + osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req)); + } + loi_list_maint(cli, loi); + RETURN(PTR_ERR(req)); + } - /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */ - optimal = cli->cl_max_pages_per_rpc; - if (cmd == OBD_BRW_WRITE) { - /* trigger a write rpc stream as long as there are dirtiers - * waiting for space. as they're waiting, they're not going to - * create more pages to coallesce with what's waiting.. */ - if (!list_empty(&cli->cl_cache_waiters)) - RETURN(1); + aa = ptlrpc_req_async_args(req); - /* *2 to avoid triggering rpcs that would want to include pages - * that are being queued but which can't be made ready until - * the queuer finishes with the page. this is a wart for - * llite::commit_write() */ - optimal += 16; + if (cmd == OBD_BRW_READ) { + lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); + lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, + (starting_offset >> CFS_PAGE_SHIFT) + 1); + } else { + lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_write_rpc_hist, + cli->cl_w_in_flight); + lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, + (starting_offset >> CFS_PAGE_SHIFT) + 1); } - if (lop->lop_num_pending >= optimal) - RETURN(1); + ptlrpc_lprocfs_brw(req, aa->aa_requested_nob); - RETURN(0); -} + client_obd_list_lock(&cli->cl_loi_list_lock); -static void on_list(struct list_head *item, struct list_head *list, - int should_be_on) -{ - if (list_empty(item) && should_be_on) - list_add_tail(item, list); - else if (!list_empty(item) && !should_be_on) - list_del_init(item); -} + if (cmd == OBD_BRW_READ) + cli->cl_r_in_flight++; + else + cli->cl_w_in_flight++; -/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc - * can find pages to build into rpcs quickly */ -static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) -{ - on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, - lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) || - lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); + /* queued sync pages can be torn down while the pages + * were between the pending list and the rpc */ + tmp = NULL; + list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { + /* only one oap gets a request reference */ + if (tmp == NULL) + tmp = oap; + if (oap->oap_interrupted && !req->rq_intr) { + CDEBUG(D_INODE, "oap %p in req %p interrupted\n", + oap, req); + ptlrpc_mark_interrupted(req); + } + } + if (tmp != NULL) + tmp->oap_request = ptlrpc_request_addref(req); - on_list(&loi->loi_write_item, &cli->cl_loi_write_list, - loi->loi_write_lop.lop_num_pending); + DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", + page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); - on_list(&loi->loi_read_item, &cli->cl_loi_read_list, - loi->loi_read_lop.lop_num_pending); + req->rq_interpret_reply = brw_interpret; + ptlrpcd_add_req(req); + RETURN(1); } #define LOI_DEBUG(LOI, STR, args...) \ @@ -1742,6 +2407,8 @@ static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) !list_empty(&(LOI)->loi_read_lop.lop_urgent), \ args) \ +/* This is called by osc_check_rpcs() to find which objects have pages that + * we could be sending. These lists are maintained by lop_makes_rpc(). */ struct lov_oinfo *osc_next_loi(struct client_obd *cli) { ENTRY; @@ -1782,7 +2449,6 @@ static void osc_check_rpcs(struct client_obd *cli) while ((loi = osc_next_loi(cli)) != NULL) { LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); - LASSERT(loi->loi_ost_idx != LL_POISON); if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight) break; @@ -1848,9 +2514,9 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw) { int rc; ENTRY; - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0; - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(rc); }; @@ -1861,21 +2527,27 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, { struct osc_cache_waiter ocw; struct l_wait_info lwi = { 0 }; - struct timeval start, stop; - CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n", - cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant, - cli->cl_avail_grant); + ENTRY; + + CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu " + "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages), + cli->cl_dirty_max, obd_max_dirty_pages, + cli->cl_lost_grant, cli->cl_avail_grant); - if (cli->cl_dirty_max < PAGE_SIZE) - return(-EDQUOT); + /* force the caller to try sync io. this can jump the list + * of queued writes and create a discontiguous rpc stream */ + if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync || + loi->loi_ar.ar_force_sync) + RETURN(-EDQUOT); /* Hopefully normal case - cache space and write credits available */ - if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max && - cli->cl_avail_grant >= PAGE_SIZE) { + if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) && + (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) && + (cli->cl_avail_grant >= CFS_PAGE_SIZE)) { /* account for ourselves */ - osc_consume_write_grant(cli, oap); - return(0); + osc_consume_write_grant(cli, &oap->oap_brw_page); + RETURN(0); } /* Make sure that there are write rpcs in flight to wait for. This @@ -1883,24 +2555,18 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, * other objects sure might. */ if (cli->cl_w_in_flight) { list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); - init_waitqueue_head(&ocw.ocw_waitq); + cfs_waitq_init(&ocw.ocw_waitq); ocw.ocw_oap = oap; ocw.ocw_rc = 0; loi_list_maint(cli, loi); osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(0, "sleeping for cache space\n"); - do_gettimeofday(&start); + CDEBUG(D_CACHE, "sleeping for cache space\n"); l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi); - do_gettimeofday(&stop); - - cli->cl_cache_wait_num++; - cli->cl_cache_wait_sum += timeval_sub(&stop, &start); - - spin_lock(&cli->cl_loi_list_lock); - lprocfs_stime_record(&cli->cl_enter_stime, &stop, &start); + + client_obd_list_lock(&cli->cl_loi_list_lock); if (!list_empty(&ocw.ocw_entry)) { list_del(&ocw.ocw_entry); RETURN(-EINTR); @@ -1911,42 +2577,90 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, RETURN(-EDQUOT); } -/* the companion to enter_cache, called when an oap is no longer part of the - * dirty accounting.. so writeback completes or truncate happens before writing - * starts. must be called with the loi lock held. */ -static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, - int sent) +/** + * Checks if requested extent lock is compatible with a lock under the page. + * + * Checks if the lock under \a page is compatible with a read or write lock + * (specified by \a rw) for an extent [\a start , \a end]. + * + * \param exp osc export + * \param lsm striping information for the file + * \param res osc_async_page placeholder + * \param rw OBD_BRW_READ if requested for reading, + * OBD_BRW_WRITE if requested for writing + * \param start start of the requested extent + * \param end end of the requested extent + * \param cookie transparent parameter for passing locking context + * + * \post result == 1, *cookie == context, appropriate lock is referenced or + * \post result == 0 + * + * \retval 1 owned lock is reused for the request + * \retval 0 no lock reused for the request + * + * \see osc_release_short_lock + */ +static int osc_reget_short_lock(struct obd_export *exp, + struct lov_stripe_md *lsm, + void **res, int rw, + obd_off start, obd_off end, + void **cookie) { + struct osc_async_page *oap = *res; + int rc; + ENTRY; - if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) { - EXIT; - return; - } + spin_lock(&oap->oap_lock); + rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw, + start, end, cookie); + spin_unlock(&oap->oap_lock); - oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT; - cli->cl_dirty -= PAGE_SIZE; - if (!sent) { - cli->cl_lost_grant += PAGE_SIZE; - CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n", - cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty); - } + RETURN(rc); +} - EXIT; +/** + * Releases a reference to a lock taken in a "fast" way. + * + * Releases a read or a write (specified by \a rw) lock + * referenced by \a cookie. + * + * \param exp osc export + * \param lsm striping information for the file + * \param end end of the locked extent + * \param rw OBD_BRW_READ if requested for reading, + * OBD_BRW_WRITE if requested for writing + * \param cookie transparent parameter for passing locking context + * + * \post appropriate lock is dereferenced + * + * \see osc_reget_short_lock + */ +static int osc_release_short_lock(struct obd_export *exp, + struct lov_stripe_md *lsm, obd_off end, + void *cookie, int rw) +{ + ENTRY; + ldlm_lock_fast_release(cookie, rw); + /* no error could have happened at this layer */ + RETURN(0); } int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, - struct lov_oinfo *loi, struct page *page, + struct lov_oinfo *loi, cfs_page_t *page, obd_off offset, struct obd_async_page_ops *ops, - void *data, void **res) + void *data, void **res, int nocache, + struct lustre_handle *lockh) { struct osc_async_page *oap; + struct ldlm_res_id oid; + int rc = 0; ENTRY; - OBD_ALLOC(oap, sizeof(*oap)); - if (oap == NULL) - return -ENOMEM; + if (!page) + return size_round(sizeof(*oap)); + oap = *res; oap->oap_magic = OAP_MAGIC; oap->oap_cli = &exp->exp_obd->u.cli; oap->oap_loi = loi; @@ -1957,29 +2671,52 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, oap->oap_page = page; oap->oap_obj_off = offset; - INIT_LIST_HEAD(&oap->oap_pending_item); - INIT_LIST_HEAD(&oap->oap_urgent_item); - INIT_LIST_HEAD(&oap->oap_rpc_item); + CFS_INIT_LIST_HEAD(&oap->oap_pending_item); + CFS_INIT_LIST_HEAD(&oap->oap_urgent_item); + CFS_INIT_LIST_HEAD(&oap->oap_rpc_item); + CFS_INIT_LIST_HEAD(&oap->oap_page_list); oap->oap_occ.occ_interrupted = osc_occ_interrupted; + spin_lock_init(&oap->oap_lock); + + /* If the page was marked as notcacheable - don't add to any locks */ + if (!nocache) { + osc_build_res_name(loi->loi_id, loi->loi_gr, &oid); + /* This is the only place where we can call cache_add_extent + without oap_lock, because this page is locked now, and + the lock we are adding it to is referenced, so cannot lose + any pages either. */ + rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh); + if (rc) + RETURN(rc); + } + CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset); - *res = oap; RETURN(0); } +struct osc_async_page *oap_from_cookie(void *cookie) +{ + struct osc_async_page *oap = cookie; + if (oap->oap_magic != OAP_MAGIC) + return ERR_PTR(-EINVAL); + return oap; +}; + static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, void *cookie, int cmd, obd_off off, int count, - obd_flags brw_flags, enum async_flags async_flags) + obd_flag brw_flags, enum async_flags async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; struct osc_async_page *oap; - struct loi_oap_pages *lop; - int rc; + int rc = 0; ENTRY; - oap = OAP_FROM_COOKIE(cookie); + oap = oap_from_cookie(cookie); + if (IS_ERR(oap)) + RETURN(PTR_ERR(oap)); if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); @@ -1989,40 +2726,55 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm, !list_empty(&oap->oap_rpc_item)) RETURN(-EBUSY); + /* check if the file's owner/group is over quota */ +#ifdef HAVE_QUOTA_SUPPORT + if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){ + struct obd_async_page_ops *ops; + struct obdo *oa; + + OBDO_ALLOC(oa); + if (oa == NULL) + RETURN(-ENOMEM); + + ops = oap->oap_caller_ops; + ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa); + if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) == + NO_QUOTA) + rc = -EDQUOT; + + OBDO_FREE(oa); + if (rc) + RETURN(rc); + } +#endif + if (loi == NULL) - loi = &lsm->lsm_oinfo[0]; + loi = lsm->lsm_oinfo[0]; - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); oap->oap_cmd = cmd; - oap->oap_async_flags = async_flags; oap->oap_page_off = off; oap->oap_count = count; oap->oap_brw_flags = brw_flags; + oap->oap_async_flags = async_flags; - if (cmd == OBD_BRW_WRITE) { + if (cmd & OBD_BRW_WRITE) { rc = osc_enter_cache(cli, loi, oap); if (rc) { - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(rc); } - lop = &loi->loi_write_lop; - } else { - lop = &loi->loi_read_lop; } - if (oap->oap_async_flags & ASYNC_URGENT) - list_add(&oap->oap_urgent_item, &lop->lop_urgent); - list_add_tail(&oap->oap_pending_item, &lop->lop_pending); - lop_update_pending(cli, lop, cmd, 1); - + osc_oap_to_pending(oap); loi_list_maint(cli, loi); LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page, cmd); osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(0); } @@ -2033,7 +2785,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_set_async_flags(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, void *cookie, - obd_flags async_flags) + obd_flag async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; struct loi_oap_pages *lop; @@ -2041,21 +2793,33 @@ static int osc_set_async_flags(struct obd_export *exp, int rc = 0; ENTRY; - oap = OAP_FROM_COOKIE(cookie); + oap = oap_from_cookie(cookie); + if (IS_ERR(oap)) + RETURN(PTR_ERR(oap)); + + /* + * bug 7311: OST-side locking is only supported for liblustre for now + * (and liblustre never calls obd_set_async_flags(). I hope.), generic + * implementation has to handle case where OST-locked page was picked + * up by, e.g., ->writepage(). + */ + LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)); + LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to + * tread here. */ if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); if (loi == NULL) - loi = &lsm->lsm_oinfo[0]; + loi = lsm->lsm_oinfo[0]; - if (oap->oap_cmd == OBD_BRW_WRITE) { + if (oap->oap_cmd & OBD_BRW_WRITE) { lop = &loi->loi_write_lop; } else { lop = &loi->loi_read_lop; } - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); if (list_empty(&oap->oap_pending_item)) GOTO(out, rc = -EINVAL); @@ -2077,7 +2841,7 @@ static int osc_set_async_flags(struct obd_export *exp, oap->oap_async_flags); out: osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(rc); } @@ -2085,15 +2849,18 @@ static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, struct obd_io_group *oig, void *cookie, int cmd, obd_off off, int count, - obd_flags brw_flags, - obd_flags async_flags) + obd_flag brw_flags, + obd_flag async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; struct osc_async_page *oap; struct loi_oap_pages *lop; + int rc = 0; ENTRY; - oap = OAP_FROM_COOKIE(cookie); + oap = oap_from_cookie(cookie); + if (IS_ERR(oap)) + RETURN(PTR_ERR(oap)); if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); @@ -2104,9 +2871,9 @@ static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm, RETURN(-EBUSY); if (loi == NULL) - loi = &lsm->lsm_oinfo[0]; + loi = lsm->lsm_oinfo[0]; - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); oap->oap_cmd = cmd; oap->oap_page_off = off; @@ -2114,7 +2881,7 @@ static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm, oap->oap_brw_flags = brw_flags; oap->oap_async_flags = async_flags; - if (cmd == OBD_BRW_WRITE) + if (cmd & OBD_BRW_WRITE) lop = &loi->loi_write_lop; else lop = &loi->loi_read_lop; @@ -2122,14 +2889,15 @@ static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm, list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group); if (oap->oap_async_flags & ASYNC_GROUP_SYNC) { oap->oap_oig = oig; - oig_add_one(oig, &oap->oap_occ); + rc = oig_add_one(oig, &oap->oap_occ); } - LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page); + LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n", + oap, oap->oap_page, rc); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(0); + RETURN(rc); } static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi, @@ -2141,9 +2909,7 @@ static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi, list_for_each_safe(pos, tmp, &lop->lop_pending_group) { oap = list_entry(pos, struct osc_async_page, oap_pending_item); list_del(&oap->oap_pending_item); - list_add_tail(&oap->oap_pending_item, &lop->lop_pending); - list_add(&oap->oap_urgent_item, &lop->lop_urgent); - lop_update_pending(cli, lop, cmd, 1); + osc_oap_to_pending(oap); } loi_list_maint(cli, loi); } @@ -2156,347 +2922,113 @@ static int osc_trigger_group_io(struct obd_export *exp, struct client_obd *cli = &exp->exp_obd->u.cli; ENTRY; - if (loi == NULL) - loi = &lsm->lsm_oinfo[0]; - - spin_lock(&cli->cl_loi_list_lock); - - osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE); - osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ); - - osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); - - RETURN(0); -} - -static int osc_teardown_async_page(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct lov_oinfo *loi, void *cookie) -{ - struct client_obd *cli = &exp->exp_obd->u.cli; - struct loi_oap_pages *lop; - struct osc_async_page *oap; - int rc = 0; - ENTRY; - - oap = OAP_FROM_COOKIE(cookie); - - if (loi == NULL) - loi = &lsm->lsm_oinfo[0]; - - if (oap->oap_cmd == OBD_BRW_WRITE) { - lop = &loi->loi_write_lop; - } else { - lop = &loi->loi_read_lop; - } - - spin_lock(&cli->cl_loi_list_lock); - - if (!list_empty(&oap->oap_rpc_item)) - GOTO(out, rc = -EBUSY); - - osc_exit_cache(cli, oap, 0); - osc_wake_cache_waiters(cli); - - if (!list_empty(&oap->oap_urgent_item)) { - list_del_init(&oap->oap_urgent_item); - oap->oap_async_flags &= ~ASYNC_URGENT; - } - if (!list_empty(&oap->oap_pending_item)) { - list_del_init(&oap->oap_pending_item); - lop_update_pending(cli, lop, oap->oap_cmd, -1); - } - loi_list_maint(cli, loi); - - LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page); -out: - spin_unlock(&cli->cl_loi_list_lock); - if (rc == 0) - OBD_FREE(oap, sizeof(*oap)); - RETURN(rc); -} - -#ifdef __KERNEL__ -/* Note: caller will lock/unlock, and set uptodate on the pages */ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) -static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page *pga) -{ - struct ptlrpc_request *request = NULL; - struct ost_body *body; - struct niobuf_remote *nioptr; - struct obd_ioobj *iooptr; - int rc, size[3] = {sizeof(*body)}, mapped = 0; - int swab; - ENTRY; - - /* XXX does not handle 'new' brw protocol */ - - size[1] = sizeof(struct obd_ioobj); - size[2] = page_count * sizeof(*nioptr); - - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SAN_READ, 3, size, NULL); - if (!request) - RETURN(-ENOMEM); - - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body)); - iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr)); - nioptr = lustre_msg_buf(request->rq_reqmsg, 2, - sizeof(*nioptr) * page_count); - - memcpy(&body->oa, oa, sizeof(body->oa)); - - obdo_to_ioobj(oa, iooptr); - iooptr->ioo_bufcnt = page_count; - - for (mapped = 0; mapped < page_count; mapped++, nioptr++) { - LASSERT(PageLocked(pga[mapped].pg)); - LASSERT(mapped == 0 || - pga[mapped].disk_offset > pga[mapped - 1].disk_offset); - - nioptr->offset = pga[mapped].disk_offset; - nioptr->len = pga[mapped].count; - nioptr->flags = pga[mapped].flag; - } - - size[1] = page_count * sizeof(*nioptr); - request->rq_replen = lustre_msg_size(2, size); - - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out_req, rc); - - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("Can't unpack body\n"); - GOTO(out_req, rc = -EPROTO); - } - - memcpy(oa, &body->oa, sizeof(*oa)); - - swab = lustre_msg_swabbed(request->rq_repmsg); - LASSERT_REPSWAB(request, 1); - nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]); - if (!nioptr) { - /* nioptr missing or short */ - GOTO(out_req, rc = -EPROTO); - } - - /* actual read */ - for (mapped = 0; mapped < page_count; mapped++, nioptr++) { - struct page *page = pga[mapped].pg; - struct buffer_head *bh; - kdev_t dev; - - if (swab) - lustre_swab_niobuf_remote (nioptr); - - /* got san device associated */ - LASSERT(exp->exp_obd != NULL); - dev = exp->exp_obd->u.cli.cl_sandev; - - /* hole */ - if (!nioptr->offset) { - CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n", - page->mapping->host->i_ino, - page->index); - memset(page_address(page), 0, PAGE_SIZE); - continue; - } - - if (!page->buffers) { - create_empty_buffers(page, dev, PAGE_SIZE); - bh = page->buffers; - - clear_bit(BH_New, &bh->b_state); - set_bit(BH_Mapped, &bh->b_state); - bh->b_blocknr = (unsigned long)nioptr->offset; - - clear_bit(BH_Uptodate, &bh->b_state); - - ll_rw_block(READ, 1, &bh); - } else { - bh = page->buffers; - - /* if buffer already existed, it must be the - * one we mapped before, check it */ - LASSERT(!test_bit(BH_New, &bh->b_state)); - LASSERT(test_bit(BH_Mapped, &bh->b_state)); - LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset); - - /* wait it's io completion */ - if (test_bit(BH_Lock, &bh->b_state)) - wait_on_buffer(bh); - - if (!test_bit(BH_Uptodate, &bh->b_state)) - ll_rw_block(READ, 1, &bh); - } - - - /* must do syncronous write here */ - wait_on_buffer(bh); - if (!buffer_uptodate(bh)) { - /* I/O error */ - rc = -EIO; - goto out_req; - } - } - -out_req: - ptlrpc_req_finished(request); - RETURN(rc); -} - -static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page *pga) -{ - struct ptlrpc_request *request = NULL; - struct ost_body *body; - struct niobuf_remote *nioptr; - struct obd_ioobj *iooptr; - int rc, size[3] = {sizeof(*body)}, mapped = 0; - int swab; - ENTRY; - - size[1] = sizeof(struct obd_ioobj); - size[2] = page_count * sizeof(*nioptr); - - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SAN_WRITE, 3, size, NULL); - if (!request) - RETURN(-ENOMEM); - - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); - iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr)); - nioptr = lustre_msg_buf(request->rq_reqmsg, 2, - sizeof (*nioptr) * page_count); - - memcpy(&body->oa, oa, sizeof(body->oa)); - - obdo_to_ioobj(oa, iooptr); - iooptr->ioo_bufcnt = page_count; - - /* pack request */ - for (mapped = 0; mapped < page_count; mapped++, nioptr++) { - LASSERT(PageLocked(pga[mapped].pg)); - LASSERT(mapped == 0 || - pga[mapped].disk_offset > pga[mapped - 1].disk_offset); - - nioptr->offset = pga[mapped].disk_offset; - nioptr->len = pga[mapped].count; - nioptr->flags = pga[mapped].flag; - } - - size[1] = page_count * sizeof(*nioptr); - request->rq_replen = lustre_msg_size(2, size); - - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out_req, rc); + if (loi == NULL) + loi = lsm->lsm_oinfo[0]; - swab = lustre_msg_swabbed (request->rq_repmsg); - LASSERT_REPSWAB (request, 1); - nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]); - if (!nioptr) { - CERROR("absent/short niobuf array\n"); - GOTO(out_req, rc = -EPROTO); - } + client_obd_list_lock(&cli->cl_loi_list_lock); - /* actual write */ - for (mapped = 0; mapped < page_count; mapped++, nioptr++) { - struct page *page = pga[mapped].pg; - struct buffer_head *bh; - kdev_t dev; + osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE); + osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ); - if (swab) - lustre_swab_niobuf_remote (nioptr); + osc_check_rpcs(cli); + client_obd_list_unlock(&cli->cl_loi_list_lock); - /* got san device associated */ - LASSERT(exp->exp_obd != NULL); - dev = exp->exp_obd->u.cli.cl_sandev; + RETURN(0); +} - if (!page->buffers) { - create_empty_buffers(page, dev, PAGE_SIZE); - } else { - /* checking */ - LASSERT(!test_bit(BH_New, &page->buffers->b_state)); - LASSERT(test_bit(BH_Mapped, &page->buffers->b_state)); - LASSERT(page->buffers->b_blocknr == - (unsigned long)nioptr->offset); - } - bh = page->buffers; +static int osc_teardown_async_page(struct obd_export *exp, + struct lov_stripe_md *lsm, + struct lov_oinfo *loi, void *cookie) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + struct loi_oap_pages *lop; + struct osc_async_page *oap; + int rc = 0; + ENTRY; - LASSERT(bh); + oap = oap_from_cookie(cookie); + if (IS_ERR(oap)) + RETURN(PTR_ERR(oap)); - /* if buffer locked, wait it's io completion */ - if (test_bit(BH_Lock, &bh->b_state)) - wait_on_buffer(bh); + if (loi == NULL) + loi = lsm->lsm_oinfo[0]; - clear_bit(BH_New, &bh->b_state); - set_bit(BH_Mapped, &bh->b_state); + if (oap->oap_cmd & OBD_BRW_WRITE) { + lop = &loi->loi_write_lop; + } else { + lop = &loi->loi_read_lop; + } - /* override the block nr */ - bh->b_blocknr = (unsigned long)nioptr->offset; + client_obd_list_lock(&cli->cl_loi_list_lock); - /* we are about to write it, so set it - * uptodate/dirty - * page lock should garentee no race condition here */ - set_bit(BH_Uptodate, &bh->b_state); - set_bit(BH_Dirty, &bh->b_state); + if (!list_empty(&oap->oap_rpc_item)) + GOTO(out, rc = -EBUSY); - ll_rw_block(WRITE, 1, &bh); + osc_exit_cache(cli, oap, 0); + osc_wake_cache_waiters(cli); - /* must do syncronous write here */ - wait_on_buffer(bh); - if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) { - /* I/O error */ - rc = -EIO; - goto out_req; - } + if (!list_empty(&oap->oap_urgent_item)) { + list_del_init(&oap->oap_urgent_item); + oap->oap_async_flags &= ~ASYNC_URGENT; } + if (!list_empty(&oap->oap_pending_item)) { + list_del_init(&oap->oap_pending_item); + lop_update_pending(cli, lop, oap->oap_cmd, -1); + } + loi_list_maint(cli, loi); + cache_remove_extent(cli->cl_cache, oap); -out_req: - ptlrpc_req_finished(request); + LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page); +out: + client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(rc); } -static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page *pga, struct obd_trans_info *oti) +int osc_extent_blocking_cb(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag) { + struct lustre_handle lockh = { 0 }; + int rc; ENTRY; - while (page_count) { - obd_count pages_per_brw; - int rc; + if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) { + LDLM_ERROR(lock, "cancelling lock with bad data %p", data); + LBUG(); + } - if (page_count > PTLRPC_MAX_BRW_PAGES) - pages_per_brw = PTLRPC_MAX_BRW_PAGES; - else - pages_per_brw = page_count; + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel failed: %d\n", rc); + break; + case LDLM_CB_CANCELING: { - if (cmd & OBD_BRW_WRITE) - rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga); - else - rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga); + ldlm_lock2handle(lock, &lockh); + /* This lock wasn't granted, don't try to do anything */ + if (lock->l_req_mode != lock->l_granted_mode) + RETURN(0); - if (rc != 0) - RETURN(rc); + cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache, + &lockh); - page_count -= pages_per_brw; - pga += pages_per_brw; + if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb) + lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb( + lock, new, data,flag); + break; + } + default: + LBUG(); } + RETURN(0); } -#endif -#endif +EXPORT_SYMBOL(osc_extent_blocking_cb); -static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) +static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, + int flags) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); @@ -2504,9 +3036,9 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) CERROR("lockh %p, data %p - client evicted?\n", lockh, data); return; } - lock_res_and_lock(lock); -#ifdef __KERNEL__ +#if defined (__KERNEL__) && defined (__linux__) + /* Liang XXX: Darwin and Winnt checking should be added */ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; struct inode *old_inode = lock->l_ast_data; @@ -2528,60 +3060,113 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, ldlm_iterator_t replace, void *data) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; struct obd_device *obd = class_exp2obd(exp); - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; - ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data); + osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id); + ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); return 0; } -static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, - __u32 type, ldlm_policy_data_t *policy, __u32 mode, - int *flags, void *bl_cb, void *cp_cb, void *gl_cb, - void *data, __u32 lvb_len, void *lvb_swabber, - struct lustre_handle *lockh) +static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req, + struct obd_info *oinfo, int intent, int rc) +{ + ENTRY; + + if (intent) { + /* The request was created before ldlm_cli_enqueue call. */ + if (rc == ELDLM_LOCK_ABORTED) { + struct ldlm_reply *rep; + rep = req_capsule_server_get(&req->rq_pill, + &RMF_DLM_REP); + + LASSERT(rep != NULL); + if (rep->lock_policy_res1) + rc = rep->lock_policy_res1; + } + } + + if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) { + CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", + oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size, + oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks, + oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime); + } + + if (!rc) + cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh); + + /* Call the update callback. */ + rc = oinfo->oi_cb_up(oinfo, rc); + RETURN(rc); +} + +static int osc_enqueue_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_enqueue_args *aa, int rc) +{ + int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT; + struct lov_stripe_md *lsm = aa->oa_oi->oi_md; + struct ldlm_lock *lock; + + /* ldlm_cli_enqueue is holding a reference on the lock, so it must + * be valid. */ + lock = ldlm_handle2lock(aa->oa_oi->oi_lockh); + + /* Complete obtaining the lock procedure. */ + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, + aa->oa_ei->ei_mode, + &aa->oa_oi->oi_flags, + &lsm->lsm_oinfo[0]->loi_lvb, + sizeof(lsm->lsm_oinfo[0]->loi_lvb), + lustre_swab_ost_lvb, + aa->oa_oi->oi_lockh, rc); + + /* Complete osc stuff. */ + rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc); + + /* Release the lock for async request. */ + if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK) + ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode); + + LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n", + aa->oa_oi->oi_lockh, req, aa); + LDLM_LOCK_PUT(lock); + return rc; +} + +/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock + * from the 2nd OSC before a lock from the 1st one. This does not deadlock with + * other synchronous requests, however keeping some locks and trying to obtain + * others may take a considerable amount of time in a case of ost failure; and + * when other sync requests do not get released lock from a client, the client + * is excluded from the cluster -- such scenarious make the life difficult, so + * release locks just after they are obtained. */ +static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset) { + struct ldlm_res_id res_id; struct obd_device *obd = exp->exp_obd; - struct ldlm_res_id res_id = { .name = {0} }; - struct ost_lvb lvb; - struct ldlm_reply *rep; struct ptlrpc_request *req = NULL; + int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT; + ldlm_mode_t mode; int rc; ENTRY; - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; + osc_build_res_name(oinfo->oi_md->lsm_object_id, + oinfo->oi_md->lsm_object_gr, &res_id); /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ - policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; - policy->l_extent.end |= ~PAGE_MASK; + oinfo->oi_policy.l_extent.start -= + oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK; + oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK; - if (lsm->lsm_oinfo->loi_kms_valid == 0) + if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0) goto no_match; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode, - lockh); - if (rc == 1) { - if (ptlrpcs_check_cred(obd->u.cli.cl_import)) { - /* return immediately if no credential held */ - ldlm_lock_decref(lockh, mode); - RETURN(-EACCES); - } - - osc_set_data_with_check(lockh, data); - if (*flags & LDLM_FL_HAS_INTENT) { - /* I would like to be able to ASSERT here that rss <= - * kms, but I can't, for reasons which are explained in - * lov_enqueue() */ - } - /* We already have a lock, and it's referenced */ - RETURN(ELDLM_OK); - } - /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ * writers can share a single PW lock. @@ -2593,84 +3178,85 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, * At some point we should cancel the read lock instead of making them * send us a blocking callback, but there are problems with canceling * locks out from other users right now, too. */ + mode = einfo->ei_mode; + if (einfo->ei_mode == LCK_PR) + mode |= LCK_PW; + mode = ldlm_lock_match(obd->obd_namespace, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, + einfo->ei_type, &oinfo->oi_policy, mode, + oinfo->oi_lockh); + if (mode) { + /* addref the lock only if not async requests and PW lock is + * matched whereas we asked for PR. */ + if (!rqset && einfo->ei_mode != mode) + ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); + osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, + oinfo->oi_flags); + if (intent) { + /* I would like to be able to ASSERT here that rss <= + * kms, but I can't, for reasons which are explained in + * lov_enqueue() */ + } - if (mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, - policy, LCK_PW, lockh); - if (rc == 1) { - if (ptlrpcs_check_cred(obd->u.cli.cl_import)) { - /* return immediately if no credential held */ - ldlm_lock_decref(lockh, LCK_PW); - RETURN(-EACCES); - } + /* We already have a lock, and it's referenced */ + oinfo->oi_cb_up(oinfo, ELDLM_OK); - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - ldlm_lock_addref(lockh, LCK_PR); - ldlm_lock_decref(lockh, LCK_PW); - osc_set_data_with_check(lockh, data); - RETURN(ELDLM_OK); - } - } - if (mode == LCK_PW) { - rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, - policy, LCK_PR, lockh); - if (rc == 1) { - rc = ldlm_cli_convert(lockh, mode, flags); - if (!rc) { - /* Update readers/writers accounting */ - ldlm_lock_addref(lockh, LCK_PW); - ldlm_lock_decref(lockh, LCK_PR); - osc_set_data_with_check(lockh, data); - RETURN(ELDLM_OK); - } - /* If the conversion failed, we need to drop refcount - on matched lock before we get new one */ - /* XXX Won't it save us some efforts if we cancel PR - lock here? We are going to take PW lock anyway and it - will invalidate PR lock */ - ldlm_lock_decref(lockh, LCK_PR); - if (rc != EDEADLOCK) { - RETURN(rc); - } - } - } + /* For async requests, decref the lock. */ + if (einfo->ei_mode != mode) + ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); + else if (rqset) + ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode); -no_match: - if (*flags & LDLM_FL_HAS_INTENT) { - int size[2] = {0, sizeof(struct ldlm_request)}; + RETURN(ELDLM_OK); + } - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + no_match: + if (intent) { + CFS_LIST_HEAD(cancels); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_ENQUEUE_LVB); if (req == NULL) RETURN(-ENOMEM); - size[0] = sizeof(*rep); - size[1] = sizeof(lvb); - req->rq_replen = lustre_msg_size(2, size); + rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0); + if (rc) + RETURN(rc); + + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb)); + ptlrpc_request_set_replen(req); } - rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type, - policy, mode, flags, bl_cb, cp_cb, gl_cb, data, - &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh); - if (req != NULL) { - if (rc == ELDLM_LOCK_ABORTED) { - /* swabbed by ldlm_cli_enqueue() */ - LASSERT_REPSWABBED(req, 0); - rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep)); - LASSERT(rep != NULL); - if (rep->lock_policy_res1) - rc = rep->lock_policy_res1; + + /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ + oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED; + + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, + &oinfo->oi_policy, &oinfo->oi_flags, + &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, + sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb), + lustre_swab_ost_lvb, oinfo->oi_lockh, + rqset ? 1 : 0); + if (rqset) { + if (!rc) { + struct osc_enqueue_args *aa; + CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->oa_oi = oinfo; + aa->oa_ei = einfo; + aa->oa_exp = exp; + + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_enqueue_interpret; + ptlrpc_set_add_req(rqset, req); + } else if (intent) { + ptlrpc_req_finished(req); } - ptlrpc_req_finished(req); + RETURN(rc); } - if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) { - CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n", - lvb.lvb_size, lvb.lvb_blocks); - lsm->lsm_oinfo->loi_rss = lvb.lvb_size; - lsm->lsm_oinfo->loi_blocks = lvb.lvb_blocks; - } + rc = osc_enqueue_fini(obd, req, oinfo, intent, rc); + if (intent) + ptlrpc_req_finished(req); RETURN(rc); } @@ -2679,43 +3265,38 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, __u32 type, ldlm_policy_data_t *policy, __u32 mode, int *flags, void *data, struct lustre_handle *lockh) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; struct obd_device *obd = exp->exp_obd; - int rc; + int lflags = *flags; + ldlm_mode_t rc; ENTRY; - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; + osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id); - OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) + RETURN(-EIO); /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother */ - policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; - policy->l_extent.end |= ~PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; + policy->l_extent.end |= ~CFS_PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, - policy, mode, lockh); - if (rc) { - // if (!(*flags & LDLM_FL_TEST_LOCK)) - osc_set_data_with_check(lockh, data); - RETURN(rc); - } /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ * writers can share a single PW lock. */ - if (mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, - policy, LCK_PW, lockh); - if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) { - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - osc_set_data_with_check(lockh, data); + rc = mode; + if (mode == LCK_PR) + rc |= LCK_PW; + rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, + &res_id, type, policy, rc, lockh); + if (rc) { + osc_set_data_with_check(lockh, data, lflags); + if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { ldlm_lock_addref(lockh, LCK_PR); ldlm_lock_decref(lockh, LCK_PW); } + RETURN(rc); } RETURN(rc); } @@ -2725,7 +3306,7 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md, { ENTRY; - if (mode == LCK_GROUP) + if (unlikely(mode == LCK_GROUP)) ldlm_lock_decref_and_cancel(lockh, mode); else ldlm_lock_decref(lockh, mode); @@ -2734,59 +3315,143 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md, } static int osc_cancel_unused(struct obd_export *exp, - struct lov_stripe_md *lsm, - int flags, void *opaque) + struct lov_stripe_md *lsm, int flags, + void *opaque) { struct obd_device *obd = class_exp2obd(exp); - struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL; + struct ldlm_res_id res_id, *resp = NULL; if (lsm != NULL) { - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; - resp = &res_id; + resp = osc_build_res_name(lsm->lsm_object_id, + lsm->lsm_object_gr, &res_id); } return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); } -static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs, - unsigned long max_age) +static int osc_statfs_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_async_args *aa, int rc) { struct obd_statfs *msfs; - struct ptlrpc_request *request; - int rc, size = sizeof(*osfs); ENTRY; + if (rc != 0) + GOTO(out, rc); + + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (msfs == NULL) { + GOTO(out, rc = -EPROTO); + } + + *aa->aa_oi->oi_osfs = *msfs; +out: + rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); + RETURN(rc); +} + +static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo, + __u64 max_age, struct ptlrpc_request_set *rqset) +{ + struct ptlrpc_request *req; + struct osc_async_args *aa; + int rc; + ENTRY; + + /* We could possibly pass max_age in the request (as an absolute + * timestamp or a "seconds.usec ago") so the target can avoid doing + * extra calls into the filesystem if that isn't necessary (e.g. + * during mount that would help a bit). Having relative timestamps + * is not so great if request processing is slow, while absolute + * timestamps are not ideal because they need time synchronization. */ + req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); + + if (oinfo->oi_flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } + + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret; + CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oi = oinfo; + + ptlrpc_set_add_req(rqset, req); + RETURN(0); +} + +static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs, + __u64 max_age, __u32 flags) +{ + struct obd_statfs *msfs; + struct ptlrpc_request *req; + struct obd_import *imp = NULL; + int rc; + ENTRY; + + /*Since the request might also come from lprocfs, so we need + *sync this with client_disconnect_export Bug15684*/ + down_read(&obd->u.cli.cl_sem); + if (obd->u.cli.cl_import) + imp = class_import_get(obd->u.cli.cl_import); + up_read(&obd->u.cli.cl_sem); + if (!imp) + RETURN(-ENODEV); + /* We could possibly pass max_age in the request (as an absolute * timestamp or a "seconds.usec ago") so the target can avoid doing * extra calls into the filesystem if that isn't necessary (e.g. * during mount that would help a bit). Having relative timestamps * is not so great if request processing is slow, while absolute * timestamps are not ideal because they need time synchronization. */ - request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION, - OST_STATFS, 0, NULL, NULL); - if (!request) + req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); + + class_import_put(imp); + + if (req == NULL) RETURN(-ENOMEM); - request->rq_replen = lustre_msg_size(1, &size); - request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249 + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); + + if (flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } - rc = ptlrpc_queue_wait(request); + rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); - msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs), - lustre_swab_obd_statfs); + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); if (msfs == NULL) { - CERROR("Can't unpack obd_statfs\n"); GOTO(out, rc = -EPROTO); } - memcpy(osfs, msfs, sizeof(*osfs)); + *osfs = *msfs; EXIT; out: - ptlrpc_req_finished(request); + ptlrpc_req_finished(req); return rc; } @@ -2798,30 +3463,45 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs, */ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) { - struct lov_user_md lum, *lumk; - int rc, lum_size; + /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */ + struct lov_user_md_v3 lum, *lumk; + struct lov_user_ost_data_v1 *lmm_objects; + int rc = 0, lum_size; ENTRY; if (!lsm) RETURN(-ENODATA); - rc = copy_from_user(&lum, lump, sizeof(lum)); - if (rc) + /* we only need the header part from user space to get lmm_magic and + * lmm_stripe_count, (the header part is common to v1 and v3) */ + lum_size = sizeof(struct lov_user_md_v1); + if (copy_from_user(&lum, lump, lum_size)) RETURN(-EFAULT); - if (lum.lmm_magic != LOV_USER_MAGIC) + if ((lum.lmm_magic != LOV_USER_MAGIC_V1) && + (lum.lmm_magic != LOV_USER_MAGIC_V3)) RETURN(-EINVAL); + /* lov_user_md_vX and lov_mds_md_vX must have the same size */ + LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1)); + LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3)); + LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0])); + + /* we can use lov_mds_md_size() to compute lum_size + * because lov_user_md_vX and lov_mds_md_vX have the same size */ if (lum.lmm_stripe_count > 0) { - lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]); + lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic); OBD_ALLOC(lumk, lum_size); if (!lumk) RETURN(-ENOMEM); - lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id; - lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr; + if (lum.lmm_magic == LOV_USER_MAGIC_V1) + lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); + else + lmm_objects = &(lumk->lmm_objects[0]); + lmm_objects->l_object_id = lsm->lsm_object_id; } else { - lum_size = sizeof(lum); + lum_size = lov_mds_md_size(0, lum.lmm_magic); lumk = &lum; } @@ -2838,6 +3518,7 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) RETURN(rc); } + static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { @@ -2846,14 +3527,10 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, int err = 0; ENTRY; -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_INC_USE_COUNT; -#else - if (!try_module_get(THIS_MODULE)) { - CERROR("Can't get module. Is it alive?"); - return -EINVAL; - } -#endif + if (!try_module_get(THIS_MODULE)) { + CERROR("Can't get module. Is it alive?"); + return -EINVAL; + } switch (cmd) { case OBD_IOC_LOV_GET_CONFIG: { char *buf; @@ -2868,17 +3545,12 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, data = (struct obd_ioctl_data *)buf; if (sizeof(*desc) > data->ioc_inllen1) { - OBD_FREE(buf, len); + obd_ioctl_freedata(buf, len); GOTO(out, err = -EINVAL); } if (data->ioc_inllen2 < sizeof(uuid)) { - OBD_FREE(buf, len); - GOTO(out, err = -EINVAL); - } - - if (data->ioc_inllen3 < sizeof(__u32)) { - OBD_FREE(buf, len); + obd_ioctl_freedata(buf, len); GOTO(out, err = -EINVAL); } @@ -2890,8 +3562,8 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, desc->ld_default_stripe_offset = 0; desc->ld_pattern = 0; memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid)); + memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid)); - *((__u32 *)data->ioc_inlbuf3) = 1; err = copy_to_user((void *)uarg, buf, len); if (err) @@ -2916,277 +3588,249 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case IOC_OSC_SET_ACTIVE: err = ptlrpc_set_import_active(obd->u.cli.cl_import, data->ioc_offset); - GOTO(out, err); - case IOC_OSC_CTL_RECOVERY: - err = ptlrpc_import_control_recovery(obd->u.cli.cl_import, - data->ioc_offset); - GOTO(out, err); - default: - CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, current->comm); - GOTO(out, err = -ENOTTY); - } -out: -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_DEC_USE_COUNT; -#else - module_put(THIS_MODULE); -#endif - return err; -} - -static int osc_get_info(struct obd_export *exp, __u32 keylen, - void *key, __u32 *vallen, void *val) -{ - ENTRY; - if (!vallen || !val) - RETURN(-EFAULT); - - if (keylen > strlen("lock_to_stripe") && - strcmp(key, "lock_to_stripe") == 0) { - __u32 *stripe = val; - *vallen = sizeof(*stripe); - *stripe = 0; - RETURN(0); - } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) { - struct ptlrpc_request *req; - obd_id *reply; - char *bufs[1] = {key}; - int rc; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_GET_INFO, 1, (int *)&keylen, bufs); - if (req == NULL) - RETURN(-ENOMEM); - - req->rq_replen = lustre_msg_size(1, (int *)vallen); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); - - reply = lustre_swab_repbuf(req, 0, sizeof(*reply), - lustre_swab_ost_last_id); - if (reply == NULL) { - CERROR("Can't unpack OST last ID\n"); - GOTO(out, rc = -EPROTO); - } - *((obd_id *)val) = *reply; - out: - ptlrpc_req_finished(req); - RETURN(rc); - } else if (keylen == 10 && strcmp(key, "client_nid") == 0) { - struct ptlrpc_connection * conn; - ptl_nid_t * nid = val; - ptl_process_id_t id; - int rc; - - *vallen = sizeof(*nid); - conn = class_exp2cliimp(exp)->imp_connection; - - if (!conn || !conn->c_peer.peer_ni) - RETURN(-ENOTCONN); - - rc = PtlGetId(conn->c_peer.peer_ni->pni_ni_h, &id); - if (rc == PTL_OK) - *nid = id.nid; - - RETURN(0); - } - RETURN(-EPROTO); -} - -static int osc_set_info(struct obd_export *exp, obd_count keylen, - void *key, obd_count vallen, void *val) -{ - struct obd_device *obd = exp->exp_obd; - struct obd_import *imp = class_exp2cliimp(exp); - struct llog_ctxt *ctxt; - int rc = 0; - ENTRY; - - if (keylen == strlen("unlinked") && - memcmp(key, "unlinked", keylen) == 0) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_NOSPC; - spin_unlock(&oscc->oscc_lock); - RETURN(0); - } - - if (keylen == strlen("unrecovery") && - memcmp(key, "unrecovery", keylen) == 0) { - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; - spin_unlock(&oscc->oscc_lock); - RETURN(0); - } - - if (keylen == strlen("initial_recov") && - memcmp(key, "initial_recov", strlen("initial_recov")) == 0) { - struct obd_import *imp = class_exp2cliimp(exp); - if (vallen != sizeof(int)) - RETURN(-EINVAL); - imp->imp_initial_recov = *(int *)val; - CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n", - exp->exp_obd->obd_name, - imp->imp_initial_recov); - RETURN(0); + GOTO(out, err); + case OBD_IOC_POLL_QUOTACHECK: + err = lquota_poll_check(quota_interface, exp, + (struct if_quotacheck *)karg); + GOTO(out, err); + default: + CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", + cmd, cfs_curproc_comm()); + GOTO(out, err = -ENOTTY); } +out: + module_put(THIS_MODULE); + return err; +} - if (keylen == strlen("async") && - memcmp(key, "async", keylen) == 0) { - struct client_obd *cl = &obd->u.cli; - if (vallen != sizeof(int)) - RETURN(-EINVAL); - cl->cl_async = *(int *)val; - CDEBUG(D_HA, "%s: set async = %d\n", - obd->obd_name, cl->cl_async); +static int osc_get_info(struct obd_export *exp, obd_count keylen, + void *key, __u32 *vallen, void *val, + struct lov_stripe_md *lsm) +{ + ENTRY; + if (!vallen || !val) + RETURN(-EFAULT); + + if (KEY_IS(KEY_LOCK_TO_STRIPE)) { + __u32 *stripe = val; + *vallen = sizeof(*stripe); + *stripe = 0; RETURN(0); - } - - if (keylen == 5 && strcmp(key, "audit") == 0) { + } else if (KEY_IS(KEY_LAST_ID)) { struct ptlrpc_request *req; - char *bufs[2] = {key, val}; - int size[2] = {keylen, vallen}; + obd_id *reply; + char *tmp; + int rc; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 2, size, bufs); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_OST_GET_INFO_LAST_ID); if (req == NULL) RETURN(-ENOMEM); - req->rq_replen = lustre_msg_size(0, size); - lustre_swab_reqbuf(req, 1, sizeof(struct audit_attr_msg), - lustre_swab_audit_attr); + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT, keylen); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + memcpy(tmp, key, keylen); + + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); - + if (rc) + GOTO(out, rc); + + reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); + if (reply == NULL) + GOTO(out, rc = -EPROTO); + + *((obd_id *)val) = *reply; + out: ptlrpc_req_finished(req); RETURN(rc); - } - - if (keylen == 9 && strcmp(key, "audit_obj") == 0) { + } else if (KEY_IS(KEY_FIEMAP)) { struct ptlrpc_request *req; - char *bufs[2] = {key, val}; - int size[2] = {keylen, vallen}; + struct ll_user_fiemap *reply; + char *tmp; + int rc; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 2, size, bufs); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_OST_GET_INFO_FIEMAP); if (req == NULL) RETURN(-ENOMEM); - req->rq_replen = lustre_msg_size(0, size); - lustre_swab_reqbuf(req, 1, sizeof(struct obdo), - lustre_swab_obdo); + req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, + RCL_CLIENT, keylen); + req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, + RCL_CLIENT, *vallen); + req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, + RCL_SERVER, *vallen); + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); + memcpy(tmp, key, keylen); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL); + memcpy(tmp, val, *vallen); + + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); - + if (rc) + GOTO(out1, rc); + + reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); + if (reply == NULL) + GOTO(out1, rc = -EPROTO); + + memcpy(val, reply, *vallen); + out1: ptlrpc_req_finished(req); + RETURN(rc); } - if (keylen == 8 && memcmp(key, "auditlog", 8) == 0) { - struct ptlrpc_request *req; - char *bufs[2] = {key, val}; - int size[2] = {keylen, vallen}; + RETURN(-EINVAL); +} - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 2, size, bufs); - if (req == NULL) - RETURN(-ENOMEM); +static int osc_setinfo_mds_conn_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *aa, int rc) +{ + struct llog_ctxt *ctxt; + struct obd_import *imp = req->rq_import; + ENTRY; - req->rq_replen = lustre_msg_size(0, size); - lustre_swab_reqbuf(req, 1, sizeof(struct audit_msg), - lustre_swab_audit_msg); - rc = ptlrpc_queue_wait(req); - - ptlrpc_req_finished(req); + if (rc != 0) RETURN(rc); + + ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT); + if (ctxt) { + if (rc == 0) + rc = llog_initiator_connect(ctxt); + else + CERROR("cannot establish connection for " + "ctxt %p: %d\n", ctxt, rc); } - if (keylen == strlen("sec") && memcmp(key, "sec", keylen) == 0) { - struct client_obd *cli = &exp->exp_obd->u.cli; + llog_ctxt_put(ctxt); + spin_lock(&imp->imp_lock); + imp->imp_server_timeout = 1; + imp->imp_pingable = 1; + spin_unlock(&imp->imp_lock); + CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd)); + + RETURN(rc); +} + +static int osc_set_info_async(struct obd_export *exp, obd_count keylen, + void *key, obd_count vallen, void *val, + struct ptlrpc_request_set *set) +{ + struct ptlrpc_request *req; + struct obd_device *obd = exp->exp_obd; + struct obd_import *imp = class_exp2cliimp(exp); + char *tmp; + int rc; + ENTRY; - cli->cl_sec_flavor = ptlrpcs_name2flavor(val); - if (cli->cl_sec_flavor == PTLRPCS_FLVR_INVALID) { - CERROR("unrecognized security flavor %s\n", (char*) val); + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); + + if (KEY_IS(KEY_NEXT_ID)) { + if (vallen != sizeof(obd_id)) + RETURN(-ERANGE); + if (val == NULL) RETURN(-EINVAL); - } + obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1; + CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n", + exp->exp_obd->obd_name, + obd->u.cli.cl_oscc.oscc_next_id); RETURN(0); } - if (keylen == strlen("sec_flags") && - memcmp(key, "sec_flags", keylen) == 0) { - struct client_obd *cli = &exp->exp_obd->u.cli; - - cli->cl_sec_flags = *((unsigned long *) val); + if (KEY_IS(KEY_UNLINKED)) { + struct osc_creator *oscc = &obd->u.cli.cl_oscc; + spin_lock(&oscc->oscc_lock); + oscc->oscc_flags &= ~OSCC_FLAG_NOSPC; + spin_unlock(&oscc->oscc_lock); RETURN(0); } - if (keylen == strlen("flush_cred") && - memcmp(key, "flush_cred", keylen) == 0) { - struct client_obd *cli = &exp->exp_obd->u.cli; + if (KEY_IS(KEY_INIT_RECOV)) { + if (vallen != sizeof(int)) + RETURN(-EINVAL); + spin_lock(&imp->imp_lock); + imp->imp_initial_recov = *(int *)val; + spin_unlock(&imp->imp_lock); + CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n", + exp->exp_obd->obd_name, + imp->imp_initial_recov); + RETURN(0); + } - if (cli->cl_import) - ptlrpcs_import_flush_current_creds(cli->cl_import); + if (KEY_IS(KEY_CHECKSUM)) { + if (vallen != sizeof(int)) + RETURN(-EINVAL); + exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0; RETURN(0); } - if (keylen == strlen("crypto_cb") && - memcmp(key, "crypto_cb", keylen) == 0) { - LASSERT(vallen == sizeof(crypt_cb_t)); - osc_crypt_cb = (crypt_cb_t)val; + + if (KEY_IS(KEY_FLUSH_CTX)) { + sptlrpc_import_flush_my_ctx(imp); RETURN(0); } - if (keylen == 8 && memcmp(key, "capa_key", 8) == 0) { - struct ptlrpc_request *req; - char *bufs[2] = {key, val}; - unsigned long irqflags; - int rc, size[2] = {keylen, vallen}; + if (!set) + RETURN(-EINVAL); - LASSERT(vallen == sizeof(struct lustre_capa_key)); + /* We pass all other commands directly to OST. Since nobody calls osc + methods directly and everybody is supposed to go through LOV, we + assume lov checked invalid values for us. + The only recognised values so far are evict_by_nid and mds_conn. + Even if something bad goes through, we'd get a -EINVAL from OST + anyway. */ - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 2, size, bufs); - if (req == NULL) - RETURN(-ENOMEM); - spin_lock_irqsave (&req->rq_lock, irqflags); - req->rq_replay = 1; - spin_unlock_irqrestore (&req->rq_lock, irqflags); + req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO); + if (req == NULL) + RETURN(-ENOMEM); - req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT, keylen); + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, + RCL_CLIENT, vallen); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); + if (rc) { + ptlrpc_request_free(req); RETURN(rc); } - if (keylen == strlen("setext") && - memcmp(key, "setext", keylen) == 0) { - struct client_obd *cli = &exp->exp_obd->u.cli; - struct osc_creator *oscc = &cli->cl_oscc; - struct fid_extent *ext = val; - - oscc->oscc_next_id = (obd_id)ext->fe_start; - RETURN(0); - } + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + memcpy(tmp, key, keylen); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL); + memcpy(tmp, val, vallen); - if (keylen < strlen("mds_conn") || - memcmp(key, "mds_conn", keylen) != 0) - RETURN(-EINVAL); + if (KEY_IS(KEY_MDS_CONN)) { + struct osc_creator *oscc = &obd->u.cli.cl_oscc; - ctxt = llog_get_context(&exp->exp_obd->obd_llogs, - LLOG_UNLINK_ORIG_CTXT); - if (ctxt) { - if (rc == 0) - rc = llog_initiator_connect(ctxt); - else - CERROR("cannot establish the connect for " - "ctxt %p: %d\n", ctxt, rc); + oscc->oscc_oa.o_gr = (*(__u32 *)val); + oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP; + LASSERT(oscc->oscc_oa.o_gr > 0); + req->rq_interpret_reply = osc_setinfo_mds_conn_interpret; } - imp->imp_server_timeout = 1; - CDEBUG(D_HA, "pinging OST %s\n", imp->imp_target_uuid.uuid); - imp->imp_pingable = 1; + ptlrpc_request_set_replen(req); + ptlrpc_set_add_req(set, req); + ptlrpc_check_set(NULL, set); - RETURN(rc); + RETURN(0); } @@ -3194,70 +3838,113 @@ static struct llog_operations osc_size_repl_logops = { lop_cancel: llog_obd_repl_cancel }; -static struct llog_operations osc_unlink_orig_logops; - -static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, +static struct llog_operations osc_mds_ost_orig_logops; +static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, struct obd_device *tgt, int count, - struct llog_catid *catid) + struct llog_catid *catid, struct obd_uuid *uuid) { int rc; ENTRY; - osc_unlink_orig_logops = llog_lvfs_ops; - osc_unlink_orig_logops.lop_setup = llog_obd_origin_setup; - osc_unlink_orig_logops.lop_cleanup = llog_catalog_cleanup; - osc_unlink_orig_logops.lop_add = llog_catalog_add; - osc_unlink_orig_logops.lop_connect = llog_origin_connect; + LASSERT(olg == &obd->obd_olg); + spin_lock(&obd->obd_dev_lock); + if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) { + osc_mds_ost_orig_logops = llog_lvfs_ops; + osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup; + osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup; + osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add; + osc_mds_ost_orig_logops.lop_connect = llog_origin_connect; + } + spin_unlock(&obd->obd_dev_lock); - rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_ORIG_CTXT, tgt, count, - &catid->lci_logid, &osc_unlink_orig_logops); - if (rc) - RETURN(rc); + rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count, + &catid->lci_logid, &osc_mds_ost_orig_logops); + if (rc) { + CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n"); + GOTO (out, rc); + } - rc = obd_llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL, - &osc_size_repl_logops); - RETURN(rc); + rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count, + NULL, &osc_size_repl_logops); + if (rc) { + struct llog_ctxt *ctxt = + llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + if (ctxt) + llog_cleanup(ctxt); + CERROR("failed LLOG_SIZE_REPL_CTXT\n"); + } + GOTO(out, rc); +out: + if (rc) { + CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", + obd->obd_name, tgt->obd_name, count, catid, rc); + CERROR("logid "LPX64":0x%x\n", + catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen); + } + return rc; } -static int osc_llog_finish(struct obd_device *obd, - struct obd_llogs *llogs, int count) +static int osc_llog_finish(struct obd_device *obd, int count) { - int rc; + struct llog_ctxt *ctxt; + int rc = 0, rc2 = 0; ENTRY; - rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_ORIG_CTXT)); - if (rc) - RETURN(rc); + ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + if (ctxt) + rc = llog_cleanup(ctxt); + + ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); + if (ctxt) + rc2 = llog_cleanup(ctxt); + if (!rc) + rc = rc2; - rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_REPL_CTXT)); RETURN(rc); } -static int osc_connect(struct lustre_handle *exph, - struct obd_device *obd, struct obd_uuid *cluuid, - struct obd_connect_data *data, - unsigned long connect_flags) +static int osc_reconnect(const struct lu_env *env, + struct obd_export *exp, struct obd_device *obd, + struct obd_uuid *cluuid, + struct obd_connect_data *data) { - int rc; - ENTRY; - rc = client_connect_import(exph, obd, cluuid, data, connect_flags); - RETURN(rc); + struct client_obd *cli = &obd->u.cli; + + if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { + long lost_grant; + + client_obd_list_lock(&cli->cl_loi_list_lock); + data->ocd_grant = cli->cl_avail_grant ?: + 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT; + lost_grant = cli->cl_lost_grant; + cli->cl_lost_grant = 0; + client_obd_list_unlock(&cli->cl_loi_list_lock); + + CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld " + "cl_lost_grant: %ld\n", data->ocd_grant, + cli->cl_avail_grant, lost_grant); + CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d" + " ocd_grant: %d\n", data->ocd_connect_flags, + data->ocd_version, data->ocd_grant); + } + + RETURN(0); } -static int osc_disconnect(struct obd_export *exp, unsigned long flags) +static int osc_disconnect(struct obd_export *exp) { struct obd_device *obd = class_exp2obd(exp); - struct llog_ctxt *ctxt; + struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); int rc; - ENTRY; - ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_REPL_CTXT); if (obd->u.cli.cl_conn_count == 1) /* flush any remaining cancel messages out to the target */ llog_sync(ctxt, exp); - rc = client_disconnect_export(exp, flags); - RETURN(rc); + llog_ctxt_put(ctxt); + + rc = client_disconnect_export(exp); + return rc; } static int osc_import_event(struct obd_device *obd, @@ -3266,8 +3953,8 @@ static int osc_import_event(struct obd_device *obd, { struct client_obd *cli; int rc = 0; - ENTRY; + ENTRY; LASSERT(imp->imp_obd == obd); switch (event) { @@ -3280,11 +3967,15 @@ static int osc_import_event(struct obd_device *obd, oscc->oscc_flags |= OSCC_FLAG_RECOVERING; spin_unlock(&oscc->oscc_lock); } + cli = &obd->u.cli; + client_obd_list_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + client_obd_list_unlock(&cli->cl_loi_list_lock); break; } case IMP_EVENT_INACTIVE: { - if (obd->obd_observer) - rc = obd_notify(obd->obd_observer, obd, 0, 0); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); break; } case IMP_EVENT_INVALIDATE: { @@ -3292,12 +3983,10 @@ static int osc_import_event(struct obd_device *obd, /* Reset grants */ cli = &obd->u.cli; - spin_lock(&cli->cl_loi_list_lock); - cli->cl_avail_grant = 0; - cli->cl_lost_grant = 0; + client_obd_list_lock(&cli->cl_loi_list_lock); /* all pages go to failing rpcs due to the invalid import */ osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); + client_obd_list_unlock(&cli->cl_loi_list_lock); ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); @@ -3312,9 +4001,20 @@ static int osc_import_event(struct obd_device *obd, oscc->oscc_flags &= ~OSCC_FLAG_NOSPC; spin_unlock(&oscc->oscc_lock); } + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); + break; + } + case IMP_EVENT_OCD: { + struct obd_connect_data *ocd = &imp->imp_connect_data; - if (obd->obd_observer) - rc = obd_notify(obd->obd_observer, obd, 1, 0); + if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT) + osc_init_grant(&obd->u.cli, ocd); + + /* See bug 7198 */ + if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL) + imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL; + + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); break; } default: @@ -3324,92 +4024,197 @@ static int osc_import_event(struct obd_device *obd, RETURN(rc); } -static int osc_attach(struct obd_device *dev, obd_count len, void *data) +int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { - struct lprocfs_static_vars lvars; int rc; ENTRY; - lprocfs_init_vars(osc,&lvars); - rc = lprocfs_obd_attach(dev, lvars.obd_vars); - if (rc < 0) + ENTRY; + rc = ptlrpcd_addref(); + if (rc) RETURN(rc); - rc = lproc_osc_attach_seqstat(dev); - if (rc < 0) { - lprocfs_obd_detach(dev); - RETURN(rc); - } + rc = client_obd_setup(obd, lcfg); + if (rc) { + ptlrpcd_decref(); + } else { + struct lprocfs_static_vars lvars = { 0 }; + struct client_obd *cli = &obd->u.cli; + + lprocfs_osc_init_vars(&lvars); + if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { + lproc_osc_attach_seqstat(obd); + sptlrpc_lprocfs_cliobd_attach(obd); + ptlrpc_lprocfs_register_obd(obd); + } - ptlrpc_lprocfs_register_obd(dev); - RETURN(0); -} + oscc_init(obd); + /* We need to allocate a few requests more, because + brw_interpret tries to create new requests before freeing + previous ones. Ideally we want to have 2x max_rpcs_in_flight + reserved, but I afraid that might be too much wasted RAM + in fact, so 2 is just my guess and still should work. */ + cli->cl_import->imp_rq_pool = + ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, + OST_MAXREQSIZE, + ptlrpc_add_rqs_to_pool); + cli->cl_cache = cache_create(obd); + if (!cli->cl_cache) { + osc_cleanup(obd); + rc = -ENOMEM; + } + } -static int osc_detach(struct obd_device *dev) -{ - ptlrpc_lprocfs_unregister_obd(dev); - return lprocfs_obd_detach(dev); + RETURN(rc); } -static int osc_setup(struct obd_device *obd, obd_count len, void *buf) +static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) { - int rc; + int rc = 0; ENTRY; - rc = ptlrpcd_addref(); - if (rc) - RETURN(rc); - - rc = client_obd_setup(obd, len, buf); - if (rc) - ptlrpcd_decref(); - else - oscc_init(obd); + switch (stage) { + case OBD_CLEANUP_EARLY: { + struct obd_import *imp; + imp = obd->u.cli.cl_import; + CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); + /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ + ptlrpc_deactivate_import(imp); + spin_lock(&imp->imp_lock); + imp->imp_pingable = 0; + spin_unlock(&imp->imp_lock); + break; + } + case OBD_CLEANUP_EXPORTS: { + /* If we set up but never connected, the + client import will not have been cleaned. */ + if (obd->u.cli.cl_import) { + struct obd_import *imp; + imp = obd->u.cli.cl_import; + CDEBUG(D_CONFIG, "%s: client import never connected\n", + obd->obd_name); + ptlrpc_invalidate_import(imp); + ptlrpc_free_rq_pool(imp->imp_rq_pool); + class_destroy_import(imp); + obd->u.cli.cl_import = NULL; + } + rc = obd_llog_finish(obd, 0); + if (rc != 0) + CERROR("failed to cleanup llogging subsystems\n"); + break; + } + } RETURN(rc); } -static int osc_cleanup(struct obd_device *obd, int flags) +int osc_cleanup(struct obd_device *obd) { struct osc_creator *oscc = &obd->u.cli.cl_oscc; int rc; - rc = ldlm_cli_cancel_unused(obd->obd_namespace, NULL, - LDLM_FL_CONFIG_CHANGE, NULL); - if (rc) - RETURN(rc); + ENTRY; + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); spin_lock(&oscc->oscc_lock); oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; oscc->oscc_flags |= OSCC_FLAG_EXITING; spin_unlock(&oscc->oscc_lock); - rc = client_obd_cleanup(obd, flags); + /* free memory of osc quota cache */ + lquota_cleanup(quota_interface, obd); + + cache_destroy(obd->u.cli.cl_cache); + rc = client_obd_cleanup(obd); + ptlrpcd_decref(); RETURN(rc); } - +static int osc_register_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func, + obd_pin_extent_cb pin_cb) +{ + return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func, + pin_cb); +} + +static int osc_unregister_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func) +{ + return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func); +} + +static int osc_register_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL); + + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb; + return 0; +} + +static int osc_unregister_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) { + CERROR("Unregistering cancel cb %p, while only %p was " + "registered\n", cb, + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb); + RETURN(-EINVAL); + } + + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL; + return 0; +} + +static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) +{ + struct lustre_cfg *lcfg = buf; + struct lprocfs_static_vars lvars = { 0 }; + int rc = 0; + + lprocfs_osc_init_vars(&lvars); + + switch (lcfg->lcfg_command) { + case LCFG_SPTLRPC_CONF: + rc = sptlrpc_cliobd_process_config(obd, lcfg); + break; + default: + rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, + lcfg, obd); + break; + } + + return(rc); +} + struct obd_ops osc_obd_ops = { .o_owner = THIS_MODULE, - .o_attach = osc_attach, - .o_detach = osc_detach, .o_setup = osc_setup, + .o_precleanup = osc_precleanup, .o_cleanup = osc_cleanup, .o_add_conn = client_import_add_conn, .o_del_conn = client_import_del_conn, - .o_connect = osc_connect, + .o_connect = client_connect_import, + .o_reconnect = osc_reconnect, .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, + .o_statfs_async = osc_statfs_async, .o_packmd = osc_packmd, .o_unpackmd = osc_unpackmd, + .o_precreate = osc_precreate, .o_create = osc_create, .o_destroy = osc_destroy, .o_getattr = osc_getattr, .o_getattr_async = osc_getattr_async, .o_setattr = osc_setattr, + .o_setattr_async = osc_setattr_async, .o_brw = osc_brw, .o_brw_async = osc_brw_async, .o_prep_async_page = osc_prep_async_page, + .o_reget_short_lock = osc_reget_short_lock, + .o_release_short_lock = osc_release_short_lock, .o_queue_async_io = osc_queue_async_io, .o_set_async_flags = osc_set_async_flags, .o_queue_group_io = osc_queue_group_io, @@ -3424,71 +4229,37 @@ struct obd_ops osc_obd_ops = { .o_cancel_unused = osc_cancel_unused, .o_iocontrol = osc_iocontrol, .o_get_info = osc_get_info, - .o_set_info = osc_set_info, - .o_import_event = osc_import_event, - .o_llog_init = osc_llog_init, - .o_llog_finish = osc_llog_finish, -}; - -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) -struct obd_ops sanosc_obd_ops = { - .o_owner = THIS_MODULE, - .o_attach = osc_attach, - .o_detach = osc_detach, - .o_cleanup = client_obd_cleanup, - .o_add_conn = client_import_add_conn, - .o_del_conn = client_import_del_conn, - .o_connect = osc_connect, - .o_disconnect = client_disconnect_export, - .o_statfs = osc_statfs, - .o_packmd = osc_packmd, - .o_unpackmd = osc_unpackmd, - .o_create = osc_real_create, - .o_destroy = osc_destroy, - .o_getattr = osc_getattr, - .o_getattr_async = osc_getattr_async, - .o_setattr = osc_setattr, - .o_setup = client_sanobd_setup, - .o_brw = sanosc_brw, - .o_punch = osc_punch, - .o_sync = osc_sync, - .o_enqueue = osc_enqueue, - .o_match = osc_match, - .o_change_cbdata = osc_change_cbdata, - .o_cancel = osc_cancel, - .o_cancel_unused = osc_cancel_unused, - .o_iocontrol = osc_iocontrol, + .o_set_info_async = osc_set_info_async, .o_import_event = osc_import_event, .o_llog_init = osc_llog_init, .o_llog_finish = osc_llog_finish, + .o_process_config = osc_process_config, + .o_register_page_removal_cb = osc_register_page_removal_cb, + .o_unregister_page_removal_cb = osc_unregister_page_removal_cb, + .o_register_lock_cancel_cb = osc_register_lock_cancel_cb, + .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb, }; -#endif int __init osc_init(void) { - struct lprocfs_static_vars lvars; -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - struct lprocfs_static_vars sanlvars; -#endif + struct lprocfs_static_vars lvars = { 0 }; int rc; ENTRY; - lprocfs_init_vars(osc, &lvars); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - lprocfs_init_vars(osc, &sanlvars); -#endif + lprocfs_osc_init_vars(&lvars); + + request_module("lquota"); + quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface); + lquota_init(quota_interface); + init_obd_quota_ops(quota_interface, &osc_obd_ops); rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, - OBD_OSC_DEVICENAME); - if (rc) + LUSTRE_OSC_NAME, NULL); + if (rc) { + if (quota_interface) + PORTAL_SYMBOL_PUT(osc_quota_interface); RETURN(rc); - -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - rc = class_register_type(&sanosc_obd_ops, NULL, sanlvars.module_vars, - OBD_SANOSC_DEVICENAME); - if (rc) - class_unregister_type(OBD_OSC_DEVICENAME); -#endif + } RETURN(rc); } @@ -3496,16 +4267,16 @@ int __init osc_init(void) #ifdef __KERNEL__ static void /*__exit*/ osc_exit(void) { -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - class_unregister_type(OBD_SANOSC_DEVICENAME); -#endif - class_unregister_type(OBD_OSC_DEVICENAME); + lquota_exit(quota_interface); + if (quota_interface) + PORTAL_SYMBOL_PUT(osc_quota_interface); + + class_unregister_type(LUSTRE_OSC_NAME); } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); MODULE_LICENSE("GPL"); -module_init(osc_init); -module_exit(osc_exit); +cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit); #endif