X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=8ce49e27bb16e7d6ae48b70a82031bd987424023;hp=81c3ced89715ed15da34cbf31138cf53eb805b37;hb=b0f15edd90807569acdc50bb973a0e80c87ea78e;hpb=2957787997522e3967176903dd7c6f1e04964dc4 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 81c3ced..8ce49e2 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1,32 +1,37 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2001-2003 Cluster File Systems, Inc. - * Author Peter Braam + * GPL HEADER START * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * For testing and management it is treated as an obd_device, - * although * it does not export a full OBD method table (the - * requests are coming * in over the wire, so object target modules - * do not have a full * method table.) + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #ifndef EXPORT_SYMTAB @@ -34,16 +39,16 @@ #endif #define DEBUG_SUBSYSTEM S_OSC -#ifdef __KERNEL__ -# include -#else /* __KERNEL__ */ +#include + +#ifndef __KERNEL__ # include #endif #include -#include #include #include +#include #include #include @@ -56,15 +61,16 @@ #include #include #include +#include #include "osc_internal.h" static quota_interface_t *quota_interface = NULL; extern quota_interface_t osc_quota_interface; static void osc_release_ppga(struct brw_page **ppga, obd_count count); - -/* by default 10s */ -atomic_t osc_resend_time; +static int brw_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, int rc); +int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, @@ -156,7 +162,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(lsm_size); } -static inline void osc_pack_capa(struct ptlrpc_request *req, int offset, +static inline void osc_pack_capa(struct ptlrpc_request *req, struct ost_body *body, void *capa) { struct obd_capa *oc = (struct obd_capa *)capa; @@ -165,24 +171,38 @@ static inline void osc_pack_capa(struct ptlrpc_request *req, int offset, if (!capa) return; - c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c)); + c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); LASSERT(c); capa_cpy(c, oc); body->oa.o_valid |= OBD_MD_FLOSSCAPA; DEBUG_CAPA(D_SEC, c, "pack"); } -static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset, +static inline void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo) { struct ost_body *body; - body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body)); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); + body->oa = *oinfo->oi_oa; - osc_pack_capa(req, offset + 1, body, oinfo->oi_capa); + osc_pack_capa(req, body, oinfo->oi_capa); +} + +static inline void osc_set_capa_size(struct ptlrpc_request *req, + const struct req_msg_field *field, + struct obd_capa *oc) +{ + if (oc == NULL) + req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); + else + /* it is already calculated as sizeof struct obd_capa */ + ; } -static int osc_getattr_interpret(struct ptlrpc_request *req, +static int osc_getattr_interpret(const struct lu_env *env, + struct ptlrpc_request *req, struct osc_async_args *aa, int rc) { struct ost_body *body; @@ -201,7 +221,7 @@ static int osc_getattr_interpret(struct ptlrpc_request *req, aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE; aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; } else { - CERROR("can't unpack ost_body\n"); + CDEBUG(D_INFO, "can't unpack ost_body\n"); rc = -EPROTO; aa->aa_oi->oi_oa->o_valid = 0; } @@ -214,59 +234,63 @@ static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, struct ptlrpc_request_set *set) { struct ptlrpc_request *req; - struct ost_body *body; - int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; struct osc_async_args *aa; + int rc; ENTRY; - size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_GETATTR, 3, size,NULL); - if (!req) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (req == NULL) RETURN(-ENOMEM); - osc_pack_req_body(req, REQ_REC_OFF, oinfo); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); - ptlrpc_req_set_repsize(req, 2, size); - req->rq_interpret_reply = osc_getattr_interpret; + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->aa_oi = oinfo; ptlrpc_set_add_req(set, req); - RETURN (0); + RETURN(0); } static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo) { struct ptlrpc_request *req; - struct ost_body *body; - int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct ost_body *body; + int rc; ENTRY; - size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_GETATTR, 3, size, NULL); - if (!req) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (req == NULL) RETURN(-ENOMEM); - osc_pack_req_body(req, REQ_REC_OFF, oinfo); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); - ptlrpc_req_set_repsize(req, 2, size); + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); - if (rc) { - CERROR("%s failed: rc = %d\n", __FUNCTION__, rc); + if (rc) GOTO(out, rc); - } - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); *oinfo->oi_oa = body->oa; @@ -285,28 +309,33 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo, struct obd_trans_info *oti) { struct ptlrpc_request *req; - struct ost_body *body; - int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct ost_body *body; + int rc; ENTRY; LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || oinfo->oi_oa->o_gr > 0); - size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_SETATTR, 3, size, NULL); - if (!req) + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) RETURN(-ENOMEM); - osc_pack_req_body(req, REQ_REC_OFF, oinfo); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); - ptlrpc_req_set_repsize(req, 2, size); + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) GOTO(out, rc = -EPROTO); @@ -318,7 +347,8 @@ out: RETURN(rc); } -static int osc_setattr_interpret(struct ptlrpc_request *req, +static int osc_setattr_interpret(const struct lu_env *env, + struct ptlrpc_request *req, struct osc_async_args *aa, int rc) { struct ost_body *body; @@ -327,12 +357,9 @@ static int osc_setattr_interpret(struct ptlrpc_request *req, if (rc != 0) GOTO(out, rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("can't unpack ost_body\n"); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) GOTO(out, rc = -EPROTO); - } *aa->aa_oi->oi_oa = body->oa; out: @@ -345,32 +372,40 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, struct ptlrpc_request_set *rqset) { struct ptlrpc_request *req; - int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) }; struct osc_async_args *aa; + int rc; ENTRY; - size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_SETATTR, 3, size, NULL); - if (!req) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) RETURN(-ENOMEM); - osc_pack_req_body(req, REQ_REC_OFF, oinfo); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); + + ptlrpc_request_set_replen(req); + if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) { LASSERT(oti); - *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies; + oinfo->oi_oa->o_lcookie = *oti->oti_logcookies; } - ptlrpc_req_set_repsize(req, 2, size); - /* do mds to ost setattr asynchronouly */ + /* do mds to ost setattr asynchronously */ if (!rqset) { /* Do not wait for response. */ ptlrpcd_add_req(req); } else { - req->rq_interpret_reply = osc_setattr_interpret; + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_setattr_interpret; CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->aa_oi = oinfo; ptlrpc_set_add_req(rqset, req); @@ -383,9 +418,9 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct ptlrpc_request *req; - struct ost_body *body; - struct lov_stripe_md *lsm; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct ost_body *body; + struct lov_stripe_md *lsm; + int rc; ENTRY; LASSERT(oa); @@ -398,18 +433,24 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, RETURN(rc); } - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_CREATE, 2, size, NULL); - if (!req) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); + if (req == NULL) GOTO(out, rc = -ENOMEM); - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE); + if (rc) { + ptlrpc_request_free(req); + GOTO(out, rc); + } + + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); body->oa = *oa; - ptlrpc_req_set_repsize(req, 2, size); - if (oa->o_valid & OBD_MD_FLINLINE) { - LASSERT((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_DELORPHAN); + ptlrpc_request_set_replen(req); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + oa->o_flags == OBD_FL_DELORPHAN) { DEBUG_REQ(D_HA, req, "delorphan from OST integration"); /* Don't resend the delorphan req */ @@ -420,12 +461,9 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, if (rc) GOTO(out_req, rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out_req, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out_req, rc = -EPROTO); *oa = body->oa; @@ -447,22 +485,22 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, if (oa->o_valid & OBD_MD_FLCOOKIE) { if (!oti->oti_logcookies) oti_alloc_cookies(oti, 1); - *oti->oti_logcookies = *obdo_logcookie(oa); + *oti->oti_logcookies = oa->o_lcookie; } } CDEBUG(D_HA, "transno: "LPD64"\n", lustre_msg_get_transno(req->rq_repmsg)); - EXIT; out_req: ptlrpc_req_finished(req); out: if (rc && !*ea) obd_free_memmd(exp, &lsm); - return rc; + RETURN(rc); } -static int osc_punch_interpret(struct ptlrpc_request *req, +static int osc_punch_interpret(const struct lu_env *env, + struct ptlrpc_request *req, struct osc_async_args *aa, int rc) { struct ost_body *body; @@ -471,12 +509,9 @@ static int osc_punch_interpret(struct ptlrpc_request *req, if (rc != 0) GOTO(out, rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) GOTO(out, rc = -EPROTO); - } *aa->aa_oi->oi_oa = body->oa; out: @@ -490,35 +525,41 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo, { struct ptlrpc_request *req; struct osc_async_args *aa; - struct ost_body *body; - int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct ost_body *body; + int rc; ENTRY; if (!oinfo->oi_oa) { - CERROR("oa NULL\n"); + CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } - size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_PUNCH, 3, size, NULL); - if (!req) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH); + if (req == NULL) RETURN(-ENOMEM); + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ + ptlrpc_at_set_req_timeout(req); + osc_pack_req_body(req, oinfo); - osc_pack_req_body(req, REQ_REC_OFF, oinfo); /* overload the size and blocks fields in the oa with start/end */ - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); body->oa.o_size = oinfo->oi_policy.l_extent.start; body->oa.o_blocks = oinfo->oi_policy.l_extent.end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); + ptlrpc_request_set_replen(req); - ptlrpc_req_set_repsize(req, 2, size); - req->rq_interpret_reply = osc_punch_interpret; + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret; CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->aa_oi = oinfo; ptlrpc_set_add_req(rqset, req); @@ -530,43 +571,44 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, void *capa) { struct ptlrpc_request *req; - struct ost_body *body; - int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct ost_body *body; + int rc; ENTRY; if (!oa) { - CERROR("oa NULL\n"); + CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } - size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0; - - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_SYNC, 3, size, NULL); - if (!req) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC); + if (req == NULL) RETURN(-ENOMEM); + osc_set_capa_size(req, &RMF_CAPA1, capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + /* overload the size and blocks fields in the oa with start/end */ - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); body->oa = *oa; body->oa.o_size = start; body->oa.o_blocks = end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); + osc_pack_capa(req, body, capa); - osc_pack_capa(req, REQ_REC_OFF + 1, body, capa); - - ptlrpc_req_set_repsize(req, 2, size); + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); *oa = body->oa; @@ -584,20 +626,53 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, int lock_flags) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; - struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } }; - struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); + struct ldlm_res_id res_id; + struct ldlm_resource *res; int count; ENTRY; + osc_build_res_name(oa->o_id, oa->o_gr, &res_id); + res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); if (res == NULL) RETURN(0); + LDLM_RESOURCE_ADDREF(res); count = ldlm_cancel_resource_local(res, cancels, NULL, mode, lock_flags, 0, NULL); + LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); RETURN(count); } +static int osc_destroy_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, + int rc) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + + atomic_dec(&cli->cl_destroy_in_flight); + cfs_waitq_signal(&cli->cl_destroy_waitq); + return 0; +} + +static int osc_can_send_destroy(struct client_obd *cli) +{ + if (atomic_inc_return(&cli->cl_destroy_in_flight) <= + cli->cl_max_rpcs_in_flight) { + /* The destroy request can be sent */ + return 1; + } + if (atomic_dec_return(&cli->cl_destroy_in_flight) < + cli->cl_max_rpcs_in_flight) { + /* + * The counter has been modified between the two atomic + * operations. + */ + cfs_waitq_signal(&cli->cl_destroy_waitq); + } + return 0; +} + /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -612,44 +687,58 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *ea, struct obd_trans_info *oti, struct obd_export *md_export) { - CFS_LIST_HEAD(cancels); + struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; - struct ost_body *body; - int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 }; - int count, bufcount = 2; + struct ost_body *body; + CFS_LIST_HEAD(cancels); + int rc, count; ENTRY; if (!oa) { - CERROR("oa NULL\n"); + CDEBUG(D_INFO, "oa NULL\n"); RETURN(-EINVAL); } count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW, LDLM_FL_DISCARD_DATA); - if (exp_connect_cancelset(exp) && count) { - bufcount = 3; - size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY); - } - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_DESTROY, bufcount, size, NULL); - if (exp_connect_cancelset(exp) && req) - ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0); - else - ldlm_lock_list_put(&cancels, l_bl_ast, count); - if (!req) + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); RETURN(-ENOMEM); + } + + rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, + 0, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ + req->rq_interpret_reply = osc_destroy_interpret; + ptlrpc_at_set_req_timeout(req); - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) - memcpy(obdo_logcookie(oa), oti->oti_logcookies, - sizeof(*oti->oti_logcookies)); + oa->o_lcookie = *oti->oti_logcookies; + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); body->oa = *oa; - ptlrpc_req_set_repsize(req, 2, size); + ptlrpc_request_set_replen(req); + if (!osc_can_send_destroy(cli)) { + struct l_wait_info lwi = { 0 }; + + /* + * Wait until the number of on-going destroy RPCs drops + * under max_rpc_in_flight + */ + l_wait_event_exclusive(cli->cl_destroy_waitq, + osc_can_send_destroy(cli), &lwi); + } + + /* Do not wait for response */ ptlrpcd_add_req(req); RETURN(0); } @@ -803,8 +892,9 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body) { client_obd_list_lock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant); - cli->cl_avail_grant += body->oa.o_grant; - /* waiters are woken in brw_interpret_oap */ + if (body->oa.o_valid & OBD_MD_FLGRANT) + cli->cl_avail_grant += body->oa.o_grant; + /* waiters are woken in brw_interpret */ client_obd_list_unlock(&cli->cl_loi_list_lock); } @@ -857,7 +947,7 @@ static int check_write_rcs(struct ptlrpc_request *req, remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1, sizeof(*remote_rcs) * niocount, NULL); if (remote_rcs == NULL) { - CERROR("Missing/short RC vector on BRW_WRITE reply\n"); + CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n"); return(-EPROTO); } if (lustre_msg_swabbed(req->rq_repmsg)) @@ -869,7 +959,7 @@ static int check_write_rcs(struct ptlrpc_request *req, return(remote_rcs[i]); if (remote_rcs[i] != 0) { - CERROR("rc[%d] invalid (%d) req %p\n", + CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", i, remote_rcs[i], req); return(-EPROTO); } @@ -877,7 +967,7 @@ static int check_write_rcs(struct ptlrpc_request *req, if (req->rq_bulk->bd_nob_transferred != requested_nob) { CERROR("Unexpected # bytes transferred: %d (requested %d)\n", - requested_nob, req->rq_bulk->bd_nob_transferred); + req->rq_bulk->bd_nob_transferred, requested_nob); return(-EPROTO); } @@ -901,23 +991,25 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) } static obd_count osc_checksum_bulk(int nob, obd_count pg_count, - struct brw_page **pga) + struct brw_page **pga, int opc, + cksum_type_t cksum_type) { - __u32 cksum = ~0; + __u32 cksum; int i = 0; LASSERT (pg_count > 0); + cksum = init_checksum(cksum_type); while (nob > 0 && pg_count > 0) { - char *ptr = cfs_kmap(pga[i]->pg); + unsigned char *ptr = cfs_kmap(pga[i]->pg); int off = pga[i]->off & ~CFS_PAGE_MASK; int count = pga[i]->count > nob ? nob : pga[i]->count; /* corrupt the data before we compute the checksum, to * simulate an OST->client data error */ - if (i == 0 && - OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) + if (i == 0 && opc == OST_READ && + OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) memcpy(ptr + off, "bad1", min(4, nob)); - cksum = crc32_le(cksum, ptr + off, count); + cksum = compute_checksum(cksum, ptr + off, count, cksum_type); cfs_kunmap(pga[i]->pg); LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n", off, cksum); @@ -928,7 +1020,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, } /* For sending we only compute the wrong checksum instead * of corrupting the data so it is still correct on a redo */ - if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND)) + if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) cksum++; return cksum; @@ -936,7 +1028,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page **pga, + struct brw_page **pga, struct ptlrpc_request **reqp, struct obd_capa *ocapa) { @@ -945,77 +1037,79 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, struct ost_body *body; struct obd_ioobj *ioobj; struct niobuf_remote *niobuf; - int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int niocount, i, requested_nob, opc, rc; - struct ptlrpc_request_pool *pool; - struct lustre_capa *capa; struct osc_brw_async_args *aa; + struct req_capsule *pill; + struct brw_page *pg_prev; ENTRY; - OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */ - OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */ + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) + RETURN(-ENOMEM); /* Recoverable */ + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) + RETURN(-EINVAL); /* Fatal */ if ((cmd & OBD_BRW_WRITE) != 0) { opc = OST_WRITE; - pool = cli->cl_import->imp_rq_pool; + req = ptlrpc_request_alloc_pool(cli->cl_import, + cli->cl_import->imp_rq_pool, + &RQF_OST_BRW); } else { opc = OST_READ; - pool = NULL; + req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW); } + if (req == NULL) + RETURN(-ENOMEM); + for (niocount = i = 1; i < page_count; i++) { if (!can_merge_pages(pga[i - 1], pga[i])) niocount++; } - size[REQ_REC_OFF + 1] = sizeof(*ioobj); - size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf); - if (ocapa) - size[REQ_REC_OFF + 3] = sizeof(*capa); - - req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5, - size, NULL, pool, NULL); - if (req == NULL) - RETURN (-ENOMEM); + pill = &req->rq_pill; + req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, + niocount * sizeof(*niobuf)); + osc_set_capa_size(req, &RMF_CAPA1, ocapa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ + ptlrpc_at_set_req_timeout(req); if (opc == OST_WRITE) - desc = ptlrpc_prep_bulk_imp (req, page_count, - BULK_GET_SOURCE, OST_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, page_count, + BULK_GET_SOURCE, OST_BULK_PORTAL); else - desc = ptlrpc_prep_bulk_imp (req, page_count, - BULK_PUT_SINK, OST_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, page_count, + BULK_PUT_SINK, OST_BULK_PORTAL); + if (desc == NULL) GOTO(out, rc = -ENOMEM); /* NB request now owns desc and will free it when it gets freed */ - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj)); - niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, - niocount * sizeof(*niobuf)); + body = req_capsule_client_get(pill, &RMF_OST_BODY); + ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); + niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); + LASSERT(body && ioobj && niobuf); body->oa = *oa; obdo_to_ioobj(oa, ioobj); ioobj->ioo_bufcnt = niocount; - if (ocapa) { - capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3, - sizeof(*capa)); - capa_cpy(capa, ocapa); - body->oa.o_valid |= OBD_MD_FLOSSCAPA; - } - + osc_pack_capa(req, body, ocapa); LASSERT (page_count > 0); + pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; - struct brw_page *pg_prev = pga[i - 1]; LASSERT(pg->count > 0); LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE, "i: %d pg: %p off: "LPU64", count: %u\n", i, pg, pg->off, pg->count); -#ifdef __LINUX__ +#ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n", @@ -1042,23 +1136,39 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, niobuf->len = pg->count; niobuf->flags = pg->flag; } + pg_prev = pg; } - LASSERT((void *)(niobuf - niocount) == + LASSERTF((void *)(niobuf - niocount) == lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, - niocount * sizeof(*niobuf))); + niocount * sizeof(*niobuf)), + "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, + REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), + (void *)(niobuf - niocount)); + osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); /* size[REQ_REC_OFF] still sizeof (*body) */ if (opc == OST_WRITE) { - if (unlikely(cli->cl_checksum)) { - body->oa.o_valid |= OBD_MD_FLCKSUM; + if (unlikely(cli->cl_checksum) && + req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) { + /* store cl_cksum_type in a local variable since + * it can be changed via lprocfs */ + cksum_type_t cksum_type = cli->cl_cksum_type; + + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) + oa->o_flags = body->oa.o_flags = 0; + body->oa.o_flags |= cksum_type_pack(cksum_type); + body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; body->oa.o_cksum = osc_checksum_bulk(requested_nob, - page_count, pga); + page_count, pga, + OST_WRITE, + cksum_type); CDEBUG(D_PAGE, "checksum at write origin: %x\n", body->oa.o_cksum); /* save this in 'oa', too, for later checking */ - oa->o_valid |= OBD_MD_FLCKSUM; + oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + oa->o_flags |= cksum_type_pack(cksum_type); } else { /* clear out the checksum flag, in case this is a * resend but cl_checksum is no longer set. b=11238 */ @@ -1066,17 +1176,23 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } oa->o_cksum = body->oa.o_cksum; /* 1 RC per niobuf */ - size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount; - ptlrpc_req_set_repsize(req, 3, size); + req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, + sizeof(__u32) * niocount); } else { - if (unlikely(cli->cl_checksum)) - body->oa.o_valid |= OBD_MD_FLCKSUM; + if (unlikely(cli->cl_checksum) && + req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) + body->oa.o_flags = 0; + body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type); + body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + } + req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0); /* 1 RC for the whole I/O */ - ptlrpc_req_set_repsize(req, 2, size); } + ptlrpc_request_set_replen(req); CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_brw_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->aa_oa = oa; aa->aa_requested_nob = requested_nob; aa->aa_nio_count = niocount; @@ -1084,32 +1200,42 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, aa->aa_resends = 0; aa->aa_ppga = pga; aa->aa_cli = cli; - INIT_LIST_HEAD(&aa->aa_oaps); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); *reqp = req; - RETURN (0); + RETURN(0); out: - ptlrpc_req_finished (req); - RETURN (rc); + ptlrpc_req_finished(req); + RETURN(rc); } static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, - __u32 client_cksum, __u32 server_cksum, - int nob, obd_count page_count, - struct brw_page **pga) + __u32 client_cksum, __u32 server_cksum, int nob, + obd_count page_count, struct brw_page **pga, + cksum_type_t client_cksum_type) { __u32 new_cksum; char *msg; + cksum_type_t cksum_type; if (server_cksum == client_cksum) { CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); return 0; } - new_cksum = osc_checksum_bulk(nob, page_count, pga); + if (oa->o_valid & OBD_MD_FLFLAGS) + cksum_type = cksum_type_unpack(oa->o_flags); + else + cksum_type = OBD_CKSUM_CRC32; + + new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, + cksum_type); - if (new_cksum == server_cksum) + if (cksum_type != client_cksum_type) + msg = "the server did not use the checksum type specified in " + "the original request - likely a protocol problem"; + else if (new_cksum == server_cksum) msg = "changed on the client after we checksummed it - " "likely false positive due to mmap IO (bug 11742)"; else if (new_cksum == client_cksum) @@ -1123,15 +1249,16 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, "["LPU64"-"LPU64"]\n", msg, libcfs_nid2str(peer->nid), oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0, - oa->o_valid & OBD_MD_FLFID ? oa->o_generation : + oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0, oa->o_id, oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0, pga[0]->off, pga[page_count-1]->off + pga[page_count-1]->count - 1); - CERROR("original client csum %x, server csum %x, client csum now %x\n", - client_cksum, server_cksum, new_cksum); - return 1; + CERROR("original client csum %x (type %x), server csum %x (type %x), " + "client csum now %x\n", client_cksum, client_cksum_type, + server_cksum, cksum_type, new_cksum); + return 1; } /* Note rc enters this function as number of bytes transferred */ @@ -1152,7 +1279,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), lustre_swab_ost_body); if (body == NULL) { - CERROR ("Can't unpack body\n"); + CDEBUG(D_INFO, "Can't unpack body\n"); RETURN(-EPROTO); } @@ -1166,25 +1293,23 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (rc < 0) RETURN(rc); - if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM)) + if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) client_cksum = aa->aa_oa->o_cksum; /* save for later */ osc_update_grant(cli, body); if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { if (rc > 0) { - CERROR ("Unexpected +ve rc %d\n", rc); + CERROR("Unexpected +ve rc %d\n", rc); RETURN(-EPROTO); } LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob); - if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && - client_cksum && - check_write_checksum(&body->oa, peer, client_cksum, - body->oa.o_cksum, - aa->aa_requested_nob, - aa->aa_page_count, - aa->aa_ppga))) + if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && + check_write_checksum(&body->oa, peer, client_cksum, + body->oa.o_cksum, aa->aa_requested_nob, + aa->aa_page_count, aa->aa_ppga, + cksum_type_unpack(aa->aa_oa->o_flags))) RETURN(-EAGAIN); if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) @@ -1215,14 +1340,20 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga)) GOTO(out, rc = -EAGAIN); - if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) { + if (body->oa.o_valid & OBD_MD_FLCKSUM) { static int cksum_counter; __u32 server_cksum = body->oa.o_cksum; char *via; char *router; + cksum_type_t cksum_type; + if (body->oa.o_valid & OBD_MD_FLFLAGS) + cksum_type = cksum_type_unpack(body->oa.o_flags); + else + cksum_type = OBD_CKSUM_CRC32; client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, - aa->aa_ppga); + aa->aa_ppga, OST_READ, + cksum_type); if (peer->nid == req->rq_bulk->bd_sender) { via = router = ""; @@ -1234,7 +1365,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (server_cksum == ~0 && rc > 0) { CERROR("Protocol error: server %s set the 'checksum' " "bit, but didn't send a checksum. Not fatal, " - "but please tell CFS.\n", + "but please notify on http://bugzilla.lustre.org/\n", libcfs_nid2str(peer->nid)); } else if (server_cksum != client_cksum) { LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " @@ -1255,8 +1386,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga[aa->aa_page_count-1]->off + aa->aa_ppga[aa->aa_page_count-1]->count - 1); - CERROR("client %x, server %x\n", - client_cksum, server_cksum); + CERROR("client %x, server %x, cksum_type %x\n", + client_cksum, server_cksum, cksum_type); cksum_counter = 0; aa->aa_oa->o_cksum = client_cksum; rc = -EAGAIN; @@ -1326,7 +1457,7 @@ restart_bulk: goto restart_bulk; } - + RETURN (rc); } @@ -1344,7 +1475,7 @@ int osc_brw_redo_request(struct ptlrpc_request *request, CERROR("too many resend retries, returning error\n"); RETURN(-EIO); } - + DEBUG_REQ(D_ERROR, request, "redo for recoverable error"); /* body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); @@ -1356,13 +1487,13 @@ int osc_brw_redo_request(struct ptlrpc_request *request, OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, aa->aa_cli, aa->aa_oa, NULL /* lsm unused by osc currently */, - aa->aa_page_count, aa->aa_ppga, + aa->aa_page_count, aa->aa_ppga, &new_req, NULL /* ocapa */); if (rc) RETURN(rc); client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock); - + list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { if (oap->oap_request != NULL) { LASSERTF(request == oap->oap_request, @@ -1370,7 +1501,7 @@ int osc_brw_redo_request(struct ptlrpc_request *request, request, oap->oap_request); if (oap->oap_interrupted) { client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); - ptlrpc_req_finished(new_req); + ptlrpc_req_finished(new_req); RETURN(-EINTR); } } @@ -1380,13 +1511,13 @@ int osc_brw_redo_request(struct ptlrpc_request *request, aa->aa_resends++; new_req->rq_interpret_reply = request->rq_interpret_reply; new_req->rq_async_args = request->rq_async_args; - new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends; + new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends; - new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args; + new_aa = ptlrpc_req_async_args(new_req); - INIT_LIST_HEAD(&new_aa->aa_oaps); + CFS_INIT_LIST_HEAD(&new_aa->aa_oaps); list_splice(&aa->aa_oaps, &new_aa->aa_oaps); - INIT_LIST_HEAD(&aa->aa_oaps); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { if (oap->oap_request) { @@ -1394,39 +1525,17 @@ int osc_brw_redo_request(struct ptlrpc_request *request, oap->oap_request = ptlrpc_request_addref(new_req); } } - client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); - - DEBUG_REQ(D_INFO, new_req, "new request"); + /* use ptlrpc_set_add_req is safe because interpret functions work + * in check_set context. only one way exist with access to request + * from different thread got -EINTR - this way protected with + * cl_loi_list_lock */ ptlrpc_set_add_req(set, new_req); - RETURN(0); -} - -static int brw_interpret(struct ptlrpc_request *req, void *data, int rc) -{ - struct osc_brw_async_args *aa = data; - int i; - int nob = rc; - ENTRY; - - rc = osc_brw_fini_request(req, rc); - if (osc_recoverable_error(rc)) { - rc = osc_brw_redo_request(req, aa); - if (rc == 0) - RETURN(0); - } - if ((rc >= 0) && req->rq_set && req->rq_set->set_countp) - atomic_add(nob, (atomic_t *)req->rq_set->set_countp); - - spin_lock(&aa->aa_cli->cl_loi_list_lock); - for (i = 0; i < aa->aa_page_count; i++) - osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); - spin_unlock(&aa->aa_cli->cl_loi_list_lock); - - osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); - RETURN(rc); + DEBUG_REQ(D_INFO, new_req, "new request"); + RETURN(0); } static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, @@ -1437,6 +1546,7 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, struct ptlrpc_request *req; struct client_obd *cli = &exp->exp_obd->u.cli; int rc, i; + struct osc_brw_async_args *aa; ENTRY; /* Consume write credits even if doing a sync write - @@ -1452,14 +1562,35 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga, &req, ocapa); + + aa = ptlrpc_req_async_args(req); + if (cmd == OBD_BRW_READ) { + lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); + } else { + lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_write_rpc_hist, + cli->cl_w_in_flight); + } + ptlrpc_lprocfs_brw(req, aa->aa_requested_nob); + + LASSERT(list_empty(&aa->aa_oaps)); if (rc == 0) { req->rq_interpret_reply = brw_interpret; ptlrpc_set_add_req(set, req); + client_obd_list_lock(&cli->cl_loi_list_lock); + if (cmd == OBD_BRW_READ) + cli->cl_r_in_flight++; + else + cli->cl_w_in_flight++; + client_obd_list_unlock(&cli->cl_loi_list_lock); + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3); } else if (cmd == OBD_BRW_WRITE) { - spin_lock(&cli->cl_loi_list_lock); + client_obd_list_lock(&cli->cl_loi_list_lock); for (i = 0; i < page_count; i++) osc_release_write_grant(cli, pga[i], 0); - spin_unlock(&cli->cl_loi_list_lock); + osc_wake_cache_waiters(cli); + client_obd_list_unlock(&cli->cl_loi_list_lock); } RETURN (rc); } @@ -1913,9 +2044,9 @@ static void osc_ap_completion(struct client_obd *cli, struct obdo *oa, EXIT; } -static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc) +static int brw_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, int rc) { - struct osc_async_page *oap, *tmp; struct osc_brw_async_args *aa = data; struct client_obd *cli; ENTRY; @@ -1940,20 +2071,24 @@ static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc) else cli->cl_r_in_flight--; - /* the caller may re-use the oap after the completion call so - * we need to clean it up a little */ - list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) { - list_del_init(&oap->oap_rpc_item); - osc_ap_completion(cli, aa->aa_oa, oap, 1, rc); + if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */ + struct osc_async_page *oap, *tmp; + /* the caller may re-use the oap after the completion call so + * we need to clean it up a little */ + list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) { + list_del_init(&oap->oap_rpc_item); + osc_ap_completion(cli, aa->aa_oa, oap, 1, rc); + } + OBDO_FREE(aa->aa_oa); + } else { /* from async_internal() */ + int i; + for (i = 0; i < aa->aa_page_count; i++) + osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); } - osc_wake_cache_waiters(cli); osc_check_rpcs(cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); - OBDO_FREE(aa->aa_oa); - osc_release_ppga(aa->aa_ppga, aa->aa_page_count); RETURN(rc); } @@ -1970,6 +2105,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, void *caller_data = NULL; struct obd_capa *ocapa; struct osc_async_page *oap; + struct ldlm_lock *lock = NULL; int i, rc; ENTRY; @@ -1988,6 +2124,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, if (ops == NULL) { ops = oap->oap_caller_ops; caller_data = oap->oap_caller_data; + lock = oap->oap_ldlm_lock; } pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; @@ -2000,6 +2137,10 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, LASSERT(ops != NULL); ops->ap_fill_obdo(caller_data, cmd, oa); ocapa = ops->ap_lookup_capa(caller_data, cmd); + if (lock) { + oa->o_handle = lock->l_remote_handle; + oa->o_valid |= OBD_MD_FLHANDLE; + } sort_brw_pages(pga, page_count); rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, @@ -2019,10 +2160,10 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME); CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_brw_async_args *)&req->rq_async_args; - INIT_LIST_HEAD(&aa->aa_oaps); + aa = ptlrpc_req_async_args(req); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); list_splice(rpc_list, &aa->aa_oaps); - INIT_LIST_HEAD(rpc_list); + CFS_INIT_LIST_HEAD(rpc_list); out: if (IS_ERR(req)) { @@ -2036,6 +2177,17 @@ out: /* the loi lock is held across this function but it's allowed to release * and reacquire it during its work */ +/** + * prepare pages for ASYNC io and put pages in send queue. + * + * \param cli - + * \param loi - + * \param cmd - OBD_BRW_* macroses + * \param lop - pending pages + * + * \return zero if pages successfully add to send queue. + * \return not zere if error occurring. + */ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, int cmd, struct loi_oap_pages *lop) { @@ -2047,6 +2199,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, CFS_LIST_HEAD(rpc_list); unsigned int ending_offset; unsigned starting_offset = 0; + int srvlock = 0; ENTRY; /* first we find the pages we're allowed to work with */ @@ -2056,6 +2209,13 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, LASSERT(oap->oap_magic == OAP_MAGIC); + if (page_count != 0 && + srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) { + CDEBUG(D_PAGE, "SRVLOCK flag mismatch," + " oap %p, page %p, srvlock %u\n", + oap, oap->oap_brw_page.pg, (unsigned)!srvlock); + break; + } /* in llite being 'ready' equates to the page being locked * until completion unlocks it. commit_write submits a page * as not ready because its unlock will happen unconditionally @@ -2102,12 +2262,14 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, /* * Page submitted for IO has to be locked. Either by * ->ap_make_ready() or by higher layers. - * - * XXX nikita: this assertion should be adjusted when lustre - * starts using PG_writeback for pages being written out. */ -#if defined(__KERNEL__) && defined(__LINUX__) - LASSERT(PageLocked(oap->oap_page)); +#if defined(__KERNEL__) && defined(__linux__) + if(!(PageLocked(oap->oap_page) && + (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) { + CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n", + oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags); + LBUG(); + } #endif /* If there is a gap at the start of this page, it can't merge * with any previous page, so we'll hand the network a @@ -2137,6 +2299,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, /* now put the page back in our accounting */ list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (page_count == 0) + srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); if (++page_count >= cli->cl_max_pages_per_rpc) break; @@ -2187,22 +2351,21 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, RETURN(PTR_ERR(req)); } - aa = (struct osc_brw_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); if (cmd == OBD_BRW_READ) { lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, (starting_offset >> CFS_PAGE_SHIFT) + 1); - ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob); } else { lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight); lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, (starting_offset >> CFS_PAGE_SHIFT) + 1); - ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob); } + ptlrpc_lprocfs_brw(req, aa->aa_requested_nob); client_obd_list_lock(&cli->cl_loi_list_lock); @@ -2230,7 +2393,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); - req->rq_interpret_reply = brw_interpret_oap; + req->rq_interpret_reply = brw_interpret; ptlrpcd_add_req(req); RETURN(1); } @@ -2414,12 +2577,84 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, RETURN(-EDQUOT); } +/** + * Checks if requested extent lock is compatible with a lock under the page. + * + * Checks if the lock under \a page is compatible with a read or write lock + * (specified by \a rw) for an extent [\a start , \a end]. + * + * \param exp osc export + * \param lsm striping information for the file + * \param res osc_async_page placeholder + * \param rw OBD_BRW_READ if requested for reading, + * OBD_BRW_WRITE if requested for writing + * \param start start of the requested extent + * \param end end of the requested extent + * \param cookie transparent parameter for passing locking context + * + * \post result == 1, *cookie == context, appropriate lock is referenced or + * \post result == 0 + * + * \retval 1 owned lock is reused for the request + * \retval 0 no lock reused for the request + * + * \see osc_release_short_lock + */ +static int osc_reget_short_lock(struct obd_export *exp, + struct lov_stripe_md *lsm, + void **res, int rw, + obd_off start, obd_off end, + void **cookie) +{ + struct osc_async_page *oap = *res; + int rc; + + ENTRY; + + spin_lock(&oap->oap_lock); + rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw, + start, end, cookie); + spin_unlock(&oap->oap_lock); + + RETURN(rc); +} + +/** + * Releases a reference to a lock taken in a "fast" way. + * + * Releases a read or a write (specified by \a rw) lock + * referenced by \a cookie. + * + * \param exp osc export + * \param lsm striping information for the file + * \param end end of the locked extent + * \param rw OBD_BRW_READ if requested for reading, + * OBD_BRW_WRITE if requested for writing + * \param cookie transparent parameter for passing locking context + * + * \post appropriate lock is dereferenced + * + * \see osc_reget_short_lock + */ +static int osc_release_short_lock(struct obd_export *exp, + struct lov_stripe_md *lsm, obd_off end, + void *cookie, int rw) +{ + ENTRY; + ldlm_lock_fast_release(cookie, rw); + /* no error could have happened at this layer */ + RETURN(0); +} + int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, cfs_page_t *page, obd_off offset, struct obd_async_page_ops *ops, - void *data, void **res) + void *data, void **res, int nocache, + struct lustre_handle *lockh) { struct osc_async_page *oap; + struct ldlm_res_id oid; + int rc = 0; ENTRY; if (!page) @@ -2439,9 +2674,24 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, CFS_INIT_LIST_HEAD(&oap->oap_pending_item); CFS_INIT_LIST_HEAD(&oap->oap_urgent_item); CFS_INIT_LIST_HEAD(&oap->oap_rpc_item); + CFS_INIT_LIST_HEAD(&oap->oap_page_list); oap->oap_occ.occ_interrupted = osc_occ_interrupted; + spin_lock_init(&oap->oap_lock); + + /* If the page was marked as notcacheable - don't add to any locks */ + if (!nocache) { + osc_build_res_name(loi->loi_id, loi->loi_gr, &oid); + /* This is the only place where we can call cache_add_extent + without oap_lock, because this page is locked now, and + the lock we are adding it to is referenced, so cannot lose + any pages either. */ + rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh); + if (rc) + RETURN(rc); + } + CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset); RETURN(0); } @@ -2726,6 +2976,7 @@ static int osc_teardown_async_page(struct obd_export *exp, lop_update_pending(cli, lop, oap->oap_cmd, -1); } loi_list_maint(cli, loi); + cache_remove_extent(cli->cl_cache, oap); LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page); out: @@ -2733,6 +2984,49 @@ out: RETURN(rc); } +int osc_extent_blocking_cb(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag) +{ + struct lustre_handle lockh = { 0 }; + int rc; + ENTRY; + + if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) { + LDLM_ERROR(lock, "cancelling lock with bad data %p", data); + LBUG(); + } + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel failed: %d\n", rc); + break; + case LDLM_CB_CANCELING: { + + ldlm_lock2handle(lock, &lockh); + /* This lock wasn't granted, don't try to do anything */ + if (lock->l_req_mode != lock->l_granted_mode) + RETURN(0); + + cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache, + &lockh); + + if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb) + lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb( + lock, new, data,flag); + break; + } + default: + LBUG(); + } + + RETURN(0); +} +EXPORT_SYMBOL(osc_extent_blocking_cb); + static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, int flags) { @@ -2743,8 +3037,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, return; } lock_res_and_lock(lock); -#ifdef __KERNEL__ -#ifdef __LINUX__ +#if defined (__KERNEL__) && defined (__linux__) /* Liang XXX: Darwin and Winnt checking should be added */ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -2759,9 +3052,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, new_inode, new_inode->i_ino, new_inode->i_generation); } #endif -#endif lock->l_ast_data = data; - lock->l_flags |= (flags & LDLM_FL_NO_LRU); unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); } @@ -2769,18 +3060,16 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, ldlm_iterator_t replace, void *data) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; struct obd_device *obd = class_exp2obd(exp); - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; - + osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id); ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); return 0; } -static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo, - int intent, int rc) +static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req, + struct obd_info *oinfo, int intent, int rc) { ENTRY; @@ -2788,11 +3077,9 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo, /* The request was created before ldlm_cli_enqueue call. */ if (rc == ELDLM_LOCK_ABORTED) { struct ldlm_reply *rep; + rep = req_capsule_server_get(&req->rq_pill, + &RMF_DLM_REP); - /* swabbed by ldlm_cli_enqueue() */ - LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); - rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, - sizeof(*rep)); LASSERT(rep != NULL); if (rep->lock_policy_res1) rc = rep->lock_policy_res1; @@ -2806,12 +3093,16 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo, oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime); } + if (!rc) + cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh); + /* Call the update callback. */ rc = oinfo->oi_cb_up(oinfo, rc); RETURN(rc); } -static int osc_enqueue_interpret(struct ptlrpc_request *req, +static int osc_enqueue_interpret(const struct lu_env *env, + struct ptlrpc_request *req, struct osc_enqueue_args *aa, int rc) { int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT; @@ -2832,7 +3123,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req, aa->oa_oi->oi_lockh, rc); /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc); + rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc); /* Release the lock for async request. */ if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK) @@ -2855,17 +3146,17 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_enqueue_info *einfo, struct ptlrpc_request_set *rqset) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; struct obd_device *obd = exp->exp_obd; - struct ldlm_reply *rep; struct ptlrpc_request *req = NULL; int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT; + ldlm_mode_t mode; int rc; ENTRY; - res_id.name[0] = oinfo->oi_md->lsm_object_id; - res_id.name[2] = oinfo->oi_md->lsm_object_gr; + osc_build_res_name(oinfo->oi_md->lsm_object_id, + oinfo->oi_md->lsm_object_gr, &res_id); /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ oinfo->oi_policy.l_extent.start -= @@ -2876,11 +3167,29 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, goto no_match; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, - oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, - einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode, - oinfo->oi_lockh); - if (rc == 1) { + /* If we're trying to read, we also search for an existing PW lock. The + * VFS and page cache already protect us locally, so lots of readers/ + * writers can share a single PW lock. + * + * There are problems with conversion deadlocks, so instead of + * converting a read lock to a write lock, we'll just enqueue a new + * one. + * + * At some point we should cancel the read lock instead of making them + * send us a blocking callback, but there are problems with canceling + * locks out from other users right now, too. */ + mode = einfo->ei_mode; + if (einfo->ei_mode == LCK_PR) + mode |= LCK_PW; + mode = ldlm_lock_match(obd->obd_namespace, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, + einfo->ei_type, &oinfo->oi_policy, mode, + oinfo->oi_lockh); + if (mode) { + /* addref the lock only if not async requests and PW lock is + * matched whereas we asked for PR. */ + if (!rqset && einfo->ei_mode != mode) + ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, oinfo->oi_flags); if (intent) { @@ -2893,60 +3202,29 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up(oinfo, ELDLM_OK); /* For async requests, decref the lock. */ - if (rqset) + if (einfo->ei_mode != mode) + ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); + else if (rqset) ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode); RETURN(ELDLM_OK); } - /* If we're trying to read, we also search for an existing PW lock. The - * VFS and page cache already protect us locally, so lots of readers/ - * writers can share a single PW lock. - * - * There are problems with conversion deadlocks, so instead of - * converting a read lock to a write lock, we'll just enqueue a new - * one. - * - * At some point we should cancel the read lock instead of making them - * send us a blocking callback, but there are problems with canceling - * locks out from other users right now, too. */ - - if (einfo->ei_mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, - oinfo->oi_flags | LDLM_FL_LVB_READY, - &res_id, einfo->ei_type, &oinfo->oi_policy, - LCK_PW, oinfo->oi_lockh); - if (rc == 1) { - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - /* addref the lock only if not async requests. */ - if (!rqset) - ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); - osc_set_data_with_check(oinfo->oi_lockh, - einfo->ei_cbdata, - oinfo->oi_flags); - oinfo->oi_cb_up(oinfo, ELDLM_OK); - ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); - RETURN(ELDLM_OK); - } - } - no_match: if (intent) { - int size[3] = { - [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), - [DLM_LOCKREQ_OFF + 1] = 0 }; - - req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); + CFS_LIST_HEAD(cancels); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_ENQUEUE_LVB); if (req == NULL) RETURN(-ENOMEM); - size[DLM_LOCKREPLY_OFF] = sizeof(*rep); - size[DLM_REPLY_REC_OFF] = - sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb); - ptlrpc_req_set_repsize(req, 3, size); + rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0); + if (rc) + RETURN(rc); + + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb)); + ptlrpc_request_set_replen(req); } /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ @@ -2962,12 +3240,13 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, if (!rc) { struct osc_enqueue_args *aa; CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_enqueue_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->oa_oi = oinfo; aa->oa_ei = einfo; aa->oa_exp = exp; - req->rq_interpret_reply = osc_enqueue_interpret; + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_enqueue_interpret; ptlrpc_set_add_req(rqset, req); } else if (intent) { ptlrpc_req_finished(req); @@ -2975,7 +3254,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } - rc = osc_enqueue_fini(req, oinfo, intent, rc); + rc = osc_enqueue_fini(obd, req, oinfo, intent, rc); if (intent) ptlrpc_req_finished(req); @@ -2986,16 +3265,16 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, __u32 type, ldlm_policy_data_t *policy, __u32 mode, int *flags, void *data, struct lustre_handle *lockh) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; struct obd_device *obd = exp->exp_obd; - int rc; int lflags = *flags; + ldlm_mode_t rc; ENTRY; - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; + osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id); - OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) + RETURN(-EIO); /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother */ @@ -3003,28 +3282,21 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, policy->l_extent.end |= ~CFS_PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, - &res_id, type, policy, mode, lockh); - if (rc) { - //if (!(*flags & LDLM_FL_TEST_LOCK)) - osc_set_data_with_check(lockh, data, lflags); - RETURN(rc); - } /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ * writers can share a single PW lock. */ - if (mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, - lflags | LDLM_FL_LVB_READY, &res_id, - type, policy, LCK_PW, lockh); - if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) { - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - osc_set_data_with_check(lockh, data, lflags); + rc = mode; + if (mode == LCK_PR) + rc |= LCK_PW; + rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, + &res_id, type, policy, rc, lockh); + if (rc) { + osc_set_data_with_check(lockh, data, lflags); + if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { ldlm_lock_addref(lockh, LCK_PR); ldlm_lock_decref(lockh, LCK_PW); } + RETURN(rc); } RETURN(rc); } @@ -3047,33 +3319,18 @@ static int osc_cancel_unused(struct obd_export *exp, void *opaque) { struct obd_device *obd = class_exp2obd(exp); - struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL; + struct ldlm_res_id res_id, *resp = NULL; if (lsm != NULL) { - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; - resp = &res_id; + resp = osc_build_res_name(lsm->lsm_object_id, + lsm->lsm_object_gr, &res_id); } return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); } -static int osc_join_lru(struct obd_export *exp, - struct lov_stripe_md *lsm, int join) -{ - struct obd_device *obd = class_exp2obd(exp); - struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL; - - if (lsm != NULL) { - res_id.name[0] = lsm->lsm_object_id; - res_id.name[2] = lsm->lsm_object_gr; - resp = &res_id; - } - - return ldlm_cli_join_lru(obd->obd_namespace, resp, join); -} - -static int osc_statfs_interpret(struct ptlrpc_request *req, +static int osc_statfs_interpret(const struct lu_env *env, + struct ptlrpc_request *req, struct osc_async_args *aa, int rc) { struct obd_statfs *msfs; @@ -3082,14 +3339,12 @@ static int osc_statfs_interpret(struct ptlrpc_request *req, if (rc != 0) GOTO(out, rc); - msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs), - lustre_swab_obd_statfs); + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); if (msfs == NULL) { - CERROR("Can't unpack obd_statfs\n"); GOTO(out, rc = -EPROTO); } - memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs)); + *aa->aa_oi->oi_osfs = *msfs; out: rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); RETURN(rc); @@ -3100,7 +3355,7 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo, { struct ptlrpc_request *req; struct osc_async_args *aa; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) }; + int rc; ENTRY; /* We could possibly pass max_age in the request (as an absolute @@ -3109,17 +3364,28 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo, * during mount that would help a bit). Having relative timestamps * is not so great if request processing is slow, while absolute * timestamps are not ideal because they need time synchronization. */ - req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION, - OST_STATFS, 1, NULL, NULL); - if (!req) + req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS); + if (req == NULL) RETURN(-ENOMEM); - ptlrpc_req_set_repsize(req, 2, size); - req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249 + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); + + if (oinfo->oi_flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } - req->rq_interpret_reply = osc_statfs_interpret; + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret; CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct osc_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->aa_oi = oinfo; ptlrpc_set_add_req(rqset, req); @@ -3127,39 +3393,61 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo, } static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age) + __u64 max_age, __u32 flags) { - struct obd_statfs *msfs; + struct obd_statfs *msfs; struct ptlrpc_request *req; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) }; + struct obd_import *imp = NULL; + int rc; ENTRY; + /*Since the request might also come from lprocfs, so we need + *sync this with client_disconnect_export Bug15684*/ + down_read(&obd->u.cli.cl_sem); + if (obd->u.cli.cl_import) + imp = class_import_get(obd->u.cli.cl_import); + up_read(&obd->u.cli.cl_sem); + if (!imp) + RETURN(-ENODEV); + /* We could possibly pass max_age in the request (as an absolute * timestamp or a "seconds.usec ago") so the target can avoid doing * extra calls into the filesystem if that isn't necessary (e.g. * during mount that would help a bit). Having relative timestamps * is not so great if request processing is slow, while absolute * timestamps are not ideal because they need time synchronization. */ - req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION, - OST_STATFS, 1, NULL, NULL); - if (!req) + req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); + + class_import_put(imp); + + if (req == NULL) RETURN(-ENOMEM); - ptlrpc_req_set_repsize(req, 2, size); - req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249 + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + ptlrpc_request_set_replen(req); + req->rq_request_portal = OST_CREATE_PORTAL; + ptlrpc_at_set_req_timeout(req); + + if (flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stat in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); - msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs), - lustre_swab_obd_statfs); + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); if (msfs == NULL) { - CERROR("Can't unpack obd_statfs\n"); GOTO(out, rc = -EPROTO); } - memcpy(osfs, msfs, sizeof(*osfs)); + *osfs = *msfs; EXIT; out: @@ -3175,29 +3463,45 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs, */ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) { - struct lov_user_md lum, *lumk; + /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */ + struct lov_user_md_v3 lum, *lumk; + struct lov_user_ost_data_v1 *lmm_objects; int rc = 0, lum_size; ENTRY; if (!lsm) RETURN(-ENODATA); - if (copy_from_user(&lum, lump, sizeof(lum))) + /* we only need the header part from user space to get lmm_magic and + * lmm_stripe_count, (the header part is common to v1 and v3) */ + lum_size = sizeof(struct lov_user_md_v1); + if (copy_from_user(&lum, lump, lum_size)) RETURN(-EFAULT); - if (lum.lmm_magic != LOV_USER_MAGIC) + if ((lum.lmm_magic != LOV_USER_MAGIC_V1) && + (lum.lmm_magic != LOV_USER_MAGIC_V3)) RETURN(-EINVAL); + /* lov_user_md_vX and lov_mds_md_vX must have the same size */ + LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1)); + LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3)); + LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0])); + + /* we can use lov_mds_md_size() to compute lum_size + * because lov_user_md_vX and lov_mds_md_vX have the same size */ if (lum.lmm_stripe_count > 0) { - lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]); + lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic); OBD_ALLOC(lumk, lum_size); if (!lumk) RETURN(-ENOMEM); - lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id; - lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr; + if (lum.lmm_magic == LOV_USER_MAGIC_V1) + lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); + else + lmm_objects = &(lumk->lmm_objects[0]); + lmm_objects->l_object_id = lsm->lsm_object_id; } else { - lum_size = sizeof(lum); + lum_size = lov_mds_md_size(0, lum.lmm_magic); lumk = &lum; } @@ -3223,14 +3527,10 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, int err = 0; ENTRY; -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_INC_USE_COUNT; -#else if (!try_module_get(THIS_MODULE)) { CERROR("Can't get module. Is it alive?"); return -EINVAL; } -#endif switch (cmd) { case OBD_IOC_LOV_GET_CONFIG: { char *buf; @@ -3299,59 +3599,108 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, GOTO(out, err = -ENOTTY); } out: -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_DEC_USE_COUNT; -#else module_put(THIS_MODULE); -#endif return err; } static int osc_get_info(struct obd_export *exp, obd_count keylen, - void *key, __u32 *vallen, void *val) + void *key, __u32 *vallen, void *val, + struct lov_stripe_md *lsm) { ENTRY; if (!vallen || !val) RETURN(-EFAULT); - if (keylen > strlen("lock_to_stripe") && - strcmp(key, "lock_to_stripe") == 0) { + if (KEY_IS(KEY_LOCK_TO_STRIPE)) { __u32 *stripe = val; *vallen = sizeof(*stripe); *stripe = 0; RETURN(0); - } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) { + } else if (KEY_IS(KEY_LAST_ID)) { struct ptlrpc_request *req; - obd_id *reply; - char *bufs[2] = { NULL, key }; - int rc, size[2] = { sizeof(struct ptlrpc_body), keylen }; + obd_id *reply; + char *tmp; + int rc; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, - OST_GET_INFO, 2, size, bufs); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_OST_GET_INFO_LAST_ID); if (req == NULL) RETURN(-ENOMEM); - size[REPLY_REC_OFF] = *vallen; - ptlrpc_req_set_repsize(req, 2, size); + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT, keylen); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + memcpy(tmp, key, keylen); + + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); - reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply), - lustre_swab_ost_last_id); - if (reply == NULL) { - CERROR("Can't unpack OST last ID\n"); + reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); + if (reply == NULL) GOTO(out, rc = -EPROTO); - } + *((obd_id *)val) = *reply; out: ptlrpc_req_finished(req); RETURN(rc); + } else if (KEY_IS(KEY_FIEMAP)) { + struct ptlrpc_request *req; + struct ll_user_fiemap *reply; + char *tmp; + int rc; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_OST_GET_INFO_FIEMAP); + if (req == NULL) + RETURN(-ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, + RCL_CLIENT, keylen); + req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, + RCL_CLIENT, *vallen); + req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, + RCL_SERVER, *vallen); + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); + memcpy(tmp, key, keylen); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL); + memcpy(tmp, val, *vallen); + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out1, rc); + + reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); + if (reply == NULL) + GOTO(out1, rc = -EPROTO); + + memcpy(val, reply, *vallen); + out1: + ptlrpc_req_finished(req); + + RETURN(rc); } + RETURN(-EINVAL); } -static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req, +static int osc_setinfo_mds_conn_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *aa, int rc) { struct llog_ctxt *ctxt; @@ -3370,11 +3719,12 @@ static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req, "ctxt %p: %d\n", ctxt, rc); } + llog_ctxt_put(ctxt); spin_lock(&imp->imp_lock); imp->imp_server_timeout = 1; imp->imp_pingable = 1; spin_unlock(&imp->imp_lock); - CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd)); + CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd)); RETURN(rc); } @@ -3384,16 +3734,18 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, struct ptlrpc_request_set *set) { struct ptlrpc_request *req; - struct obd_device *obd = exp->exp_obd; - struct obd_import *imp = class_exp2cliimp(exp); - int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen }; - char *bufs[3] = { NULL, key, val }; + struct obd_device *obd = exp->exp_obd; + struct obd_import *imp = class_exp2cliimp(exp); + char *tmp; + int rc; ENTRY; OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); if (KEY_IS(KEY_NEXT_ID)) { if (vallen != sizeof(obd_id)) + RETURN(-ERANGE); + if (val == NULL) RETURN(-EINVAL); obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1; CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n", @@ -3403,7 +3755,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, RETURN(0); } - if (KEY_IS("unlinked")) { + if (KEY_IS(KEY_UNLINKED)) { struct osc_creator *oscc = &obd->u.cli.cl_oscc; spin_lock(&oscc->oscc_lock); oscc->oscc_flags &= ~OSCC_FLAG_NOSPC; @@ -3423,7 +3775,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, RETURN(0); } - if (KEY_IS("checksum")) { + if (KEY_IS(KEY_CHECKSUM)) { if (vallen != sizeof(int)) RETURN(-EINVAL); exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0; @@ -3445,12 +3797,27 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, Even if something bad goes through, we'd get a -EINVAL from OST anyway. */ - req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size, - bufs); + + req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO); if (req == NULL) RETURN(-ENOMEM); - if (KEY_IS("mds_conn")) { + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT, keylen); + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, + RCL_CLIENT, vallen); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + memcpy(tmp, key, keylen); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL); + memcpy(tmp, val, vallen); + + if (KEY_IS(KEY_MDS_CONN)) { struct osc_creator *oscc = &obd->u.cli.cl_oscc; oscc->oscc_oa.o_gr = (*(__u32 *)val); @@ -3459,9 +3826,9 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, req->rq_interpret_reply = osc_setinfo_mds_conn_interpret; } - ptlrpc_req_set_repsize(req, 1, NULL); + ptlrpc_request_set_replen(req); ptlrpc_set_add_req(set, req); - ptlrpc_check_set(set); + ptlrpc_check_set(NULL, set); RETURN(0); } @@ -3472,13 +3839,14 @@ static struct llog_operations osc_size_repl_logops = { }; static struct llog_operations osc_mds_ost_orig_logops; -static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, +static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, struct obd_device *tgt, int count, struct llog_catid *catid, struct obd_uuid *uuid) { int rc; ENTRY; + LASSERT(olg == &obd->obd_olg); spin_lock(&obd->obd_dev_lock); if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) { osc_mds_ost_orig_logops = llog_lvfs_ops; @@ -3489,17 +3857,23 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, } spin_unlock(&obd->obd_dev_lock); - rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count, + rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count, &catid->lci_logid, &osc_mds_ost_orig_logops); if (rc) { CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n"); GOTO (out, rc); } - rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL, - &osc_size_repl_logops); - if (rc) + rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count, + NULL, &osc_size_repl_logops); + if (rc) { + struct llog_ctxt *ctxt = + llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + if (ctxt) + llog_cleanup(ctxt); CERROR("failed LLOG_SIZE_REPL_CTXT\n"); + } + GOTO(out, rc); out: if (rc) { CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", @@ -3507,7 +3881,7 @@ out: CERROR("logid "LPX64":0x%x\n", catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen); } - RETURN(rc); + return rc; } static int osc_llog_finish(struct obd_device *obd, int count) @@ -3529,7 +3903,8 @@ static int osc_llog_finish(struct obd_device *obd, int count) RETURN(rc); } -static int osc_reconnect(struct obd_export *exp, struct obd_device *obd, +static int osc_reconnect(const struct lu_env *env, + struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data) { @@ -3566,6 +3941,8 @@ static int osc_disconnect(struct obd_export *exp) /* flush any remaining cancel messages out to the target */ llog_sync(ctxt, exp); + llog_ctxt_put(ctxt); + rc = client_disconnect_export(exp); return rc; } @@ -3661,18 +4038,19 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) if (rc) { ptlrpcd_decref(); } else { - struct lprocfs_static_vars lvars; + struct lprocfs_static_vars lvars = { 0 }; struct client_obd *cli = &obd->u.cli; - lprocfs_init_vars(osc, &lvars); + lprocfs_osc_init_vars(&lvars); if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { lproc_osc_attach_seqstat(obd); + sptlrpc_lprocfs_cliobd_attach(obd); ptlrpc_lprocfs_register_obd(obd); } oscc_init(obd); /* We need to allocate a few requests more, because - brw_interpret_oap tries to create new requests before freeing + brw_interpret tries to create new requests before freeing previous ones. Ideally we want to have 2x max_rpcs_in_flight reserved, but I afraid that might be too much wasted RAM in fact, so 2 is just my guess and still should work. */ @@ -3680,6 +4058,11 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, OST_MAXREQSIZE, ptlrpc_add_rqs_to_pool); + cli->cl_cache = cache_create(obd); + if (!cli->cl_cache) { + osc_cleanup(obd); + rc = -ENOMEM; + } } RETURN(rc); @@ -3715,15 +4098,11 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) class_destroy_import(imp); obd->u.cli.cl_import = NULL; } - break; - } - case OBD_CLEANUP_SELF_EXP: rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); break; - case OBD_CLEANUP_OBD: - break; + } } RETURN(rc); } @@ -3745,21 +4124,68 @@ int osc_cleanup(struct obd_device *obd) /* free memory of osc quota cache */ lquota_cleanup(quota_interface, obd); + cache_destroy(obd->u.cli.cl_cache); rc = client_obd_cleanup(obd); ptlrpcd_decref(); RETURN(rc); } +static int osc_register_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func, + obd_pin_extent_cb pin_cb) +{ + return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func, + pin_cb); +} + +static int osc_unregister_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func) +{ + return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func); +} + +static int osc_register_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL); + + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb; + return 0; +} + +static int osc_unregister_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) { + CERROR("Unregistering cancel cb %p, while only %p was " + "registered\n", cb, + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb); + RETURN(-EINVAL); + } + + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL; + return 0; +} + static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) { struct lustre_cfg *lcfg = buf; - struct lprocfs_static_vars lvars; + struct lprocfs_static_vars lvars = { 0 }; int rc = 0; - lprocfs_init_vars(osc, &lvars); + lprocfs_osc_init_vars(&lvars); + + switch (lcfg->lcfg_command) { + case LCFG_SPTLRPC_CONF: + rc = sptlrpc_cliobd_process_config(obd, lcfg); + break; + default: + rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, + lcfg, obd); + break; + } - rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd); return(rc); } @@ -3787,6 +4213,8 @@ struct obd_ops osc_obd_ops = { .o_brw = osc_brw, .o_brw_async = osc_brw_async, .o_prep_async_page = osc_prep_async_page, + .o_reget_short_lock = osc_reget_short_lock, + .o_release_short_lock = osc_release_short_lock, .o_queue_async_io = osc_queue_async_io, .o_set_async_flags = osc_set_async_flags, .o_queue_group_io = osc_queue_group_io, @@ -3799,7 +4227,6 @@ struct obd_ops osc_obd_ops = { .o_change_cbdata = osc_change_cbdata, .o_cancel = osc_cancel, .o_cancel_unused = osc_cancel_unused, - .o_join_lru = osc_join_lru, .o_iocontrol = osc_iocontrol, .o_get_info = osc_get_info, .o_set_info_async = osc_set_info_async, @@ -3807,14 +4234,19 @@ struct obd_ops osc_obd_ops = { .o_llog_init = osc_llog_init, .o_llog_finish = osc_llog_finish, .o_process_config = osc_process_config, + .o_register_page_removal_cb = osc_register_page_removal_cb, + .o_unregister_page_removal_cb = osc_unregister_page_removal_cb, + .o_register_lock_cancel_cb = osc_register_lock_cancel_cb, + .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb, }; + int __init osc_init(void) { - struct lprocfs_static_vars lvars; + struct lprocfs_static_vars lvars = { 0 }; int rc; ENTRY; - lprocfs_init_vars(osc, &lvars); + lprocfs_osc_init_vars(&lvars); request_module("lquota"); quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface); @@ -3842,7 +4274,7 @@ static void /*__exit*/ osc_exit(void) class_unregister_type(LUSTRE_OSC_NAME); } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); MODULE_LICENSE("GPL");