1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
41 # define EXPORT_SYMTAB
43 #define DEBUG_SUBSYSTEM S_OSC
45 #include <libcfs/libcfs.h>
48 # include <liblustre.h>
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
69 static quota_interface_t *quota_interface = NULL;
70 extern quota_interface_t osc_quota_interface;
72 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
73 static int brw_interpret(const struct lu_env *env,
74 struct ptlrpc_request *req, void *data, int rc);
75 int osc_cleanup(struct obd_device *obd);
77 /* Pack OSC object metadata for disk storage (LE byte order). */
78 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
79 struct lov_stripe_md *lsm)
84 lmm_size = sizeof(**lmmp);
89 OBD_FREE(*lmmp, lmm_size);
95 OBD_ALLOC(*lmmp, lmm_size);
101 LASSERT(lsm->lsm_object_id);
102 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
103 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
104 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
110 /* Unpack OSC object metadata from disk storage (LE byte order). */
111 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
112 struct lov_mds_md *lmm, int lmm_bytes)
115 struct obd_import *imp = class_exp2cliimp(exp);
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
159 LASSERT((*lsmp)->lsm_object_id);
160 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
164 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
165 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
167 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
172 static inline void osc_pack_capa(struct ptlrpc_request *req,
173 struct ost_body *body, void *capa)
175 struct obd_capa *oc = (struct obd_capa *)capa;
176 struct lustre_capa *c;
181 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
184 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
185 DEBUG_CAPA(D_SEC, c, "pack");
188 static inline void osc_pack_req_body(struct ptlrpc_request *req,
189 struct obd_info *oinfo)
191 struct ost_body *body;
193 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
196 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
197 osc_pack_capa(req, body, oinfo->oi_capa);
200 static inline void osc_set_capa_size(struct ptlrpc_request *req,
201 const struct req_msg_field *field,
205 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
207 /* it is already calculated as sizeof struct obd_capa */
211 static int osc_getattr_interpret(const struct lu_env *env,
212 struct ptlrpc_request *req,
213 struct osc_async_args *aa, int rc)
215 struct ost_body *body;
221 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
223 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
224 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
226 /* This should really be sent by the OST */
227 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
228 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
230 CDEBUG(D_INFO, "can't unpack ost_body\n");
232 aa->aa_oi->oi_oa->o_valid = 0;
235 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
239 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
240 struct ptlrpc_request_set *set)
242 struct ptlrpc_request *req;
243 struct osc_async_args *aa;
247 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
251 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
252 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
254 ptlrpc_request_free(req);
258 osc_pack_req_body(req, oinfo);
260 ptlrpc_request_set_replen(req);
261 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
263 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264 aa = ptlrpc_req_async_args(req);
267 ptlrpc_set_add_req(set, req);
271 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
273 struct ptlrpc_request *req;
274 struct ost_body *body;
278 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
282 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
283 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
285 ptlrpc_request_free(req);
289 osc_pack_req_body(req, oinfo);
291 ptlrpc_request_set_replen(req);
293 rc = ptlrpc_queue_wait(req);
297 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
299 GOTO(out, rc = -EPROTO);
301 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
302 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
304 /* This should really be sent by the OST */
305 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
306 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
310 ptlrpc_req_finished(req);
314 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
315 struct obd_trans_info *oti)
317 struct ptlrpc_request *req;
318 struct ost_body *body;
322 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
324 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
328 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
329 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
331 ptlrpc_request_free(req);
335 osc_pack_req_body(req, oinfo);
337 ptlrpc_request_set_replen(req);
339 rc = ptlrpc_queue_wait(req);
343 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
345 GOTO(out, rc = -EPROTO);
347 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
351 ptlrpc_req_finished(req);
355 static int osc_setattr_interpret(const struct lu_env *env,
356 struct ptlrpc_request *req,
357 struct osc_setattr_args *sa, int rc)
359 struct ost_body *body;
365 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
367 GOTO(out, rc = -EPROTO);
369 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
371 rc = sa->sa_upcall(sa->sa_cookie, rc);
375 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
376 struct obd_trans_info *oti,
377 obd_enqueue_update_f upcall, void *cookie,
378 struct ptlrpc_request_set *rqset)
380 struct ptlrpc_request *req;
381 struct osc_setattr_args *sa;
385 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
389 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
390 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
392 ptlrpc_request_free(req);
396 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
399 osc_pack_req_body(req, oinfo);
401 ptlrpc_request_set_replen(req);
403 /* do mds to ost setattr asynchronously */
405 /* Do not wait for response. */
406 ptlrpcd_add_req(req, PSCOPE_OTHER);
408 req->rq_interpret_reply =
409 (ptlrpc_interpterer_t)osc_setattr_interpret;
411 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
412 sa = ptlrpc_req_async_args(req);
413 sa->sa_oa = oinfo->oi_oa;
414 sa->sa_upcall = upcall;
415 sa->sa_cookie = cookie;
417 if (rqset == PTLRPCD_SET)
418 ptlrpcd_add_req(req, PSCOPE_OTHER);
420 ptlrpc_set_add_req(rqset, req);
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427 struct obd_trans_info *oti,
428 struct ptlrpc_request_set *rqset)
430 return osc_setattr_async_base(exp, oinfo, oti,
431 oinfo->oi_cb_up, oinfo, rqset);
434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435 struct lov_stripe_md **ea, struct obd_trans_info *oti)
437 struct ptlrpc_request *req;
438 struct ost_body *body;
439 struct lov_stripe_md *lsm;
448 rc = obd_alloc_memmd(exp, &lsm);
453 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
455 GOTO(out, rc = -ENOMEM);
457 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
459 ptlrpc_request_free(req);
463 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465 lustre_set_wire_obdo(&body->oa, oa);
467 ptlrpc_request_set_replen(req);
469 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
470 oa->o_flags == OBD_FL_DELORPHAN) {
472 "delorphan from OST integration");
473 /* Don't resend the delorphan req */
474 req->rq_no_resend = req->rq_no_delay = 1;
477 rc = ptlrpc_queue_wait(req);
481 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
483 GOTO(out_req, rc = -EPROTO);
485 lustre_get_wire_obdo(oa, &body->oa);
487 /* This should really be sent by the OST */
488 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
489 oa->o_valid |= OBD_MD_FLBLKSZ;
491 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
492 * have valid lsm_oinfo data structs, so don't go touching that.
493 * This needs to be fixed in a big way.
495 lsm->lsm_object_id = oa->o_id;
496 lsm->lsm_object_seq = oa->o_seq;
500 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
502 if (oa->o_valid & OBD_MD_FLCOOKIE) {
503 if (!oti->oti_logcookies)
504 oti_alloc_cookies(oti, 1);
505 *oti->oti_logcookies = oa->o_lcookie;
509 CDEBUG(D_HA, "transno: "LPD64"\n",
510 lustre_msg_get_transno(req->rq_repmsg));
512 ptlrpc_req_finished(req);
515 obd_free_memmd(exp, &lsm);
519 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
520 obd_enqueue_update_f upcall, void *cookie,
521 struct ptlrpc_request_set *rqset)
523 struct ptlrpc_request *req;
524 struct osc_setattr_args *sa;
525 struct ost_body *body;
529 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536 ptlrpc_request_free(req);
539 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540 ptlrpc_at_set_req_timeout(req);
542 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
545 osc_pack_capa(req, body, oinfo->oi_capa);
547 ptlrpc_request_set_replen(req);
550 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
552 sa = ptlrpc_req_async_args(req);
553 sa->sa_oa = oinfo->oi_oa;
554 sa->sa_upcall = upcall;
555 sa->sa_cookie = cookie;
556 if (rqset == PTLRPCD_SET)
557 ptlrpcd_add_req(req, PSCOPE_OTHER);
559 ptlrpc_set_add_req(rqset, req);
564 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
565 struct obd_trans_info *oti,
566 struct ptlrpc_request_set *rqset)
568 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
569 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
570 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
571 return osc_punch_base(exp, oinfo,
572 oinfo->oi_cb_up, oinfo, rqset);
575 static int osc_sync_interpret(const struct lu_env *env,
576 struct ptlrpc_request *req,
579 struct osc_async_args *aa = arg;
580 struct ost_body *body;
586 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
588 CERROR ("can't unpack ost_body\n");
589 GOTO(out, rc = -EPROTO);
592 *aa->aa_oi->oi_oa = body->oa;
594 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
598 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
599 obd_size start, obd_size end,
600 struct ptlrpc_request_set *set)
602 struct ptlrpc_request *req;
603 struct ost_body *body;
604 struct osc_async_args *aa;
609 CDEBUG(D_INFO, "oa NULL\n");
613 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
617 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
618 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
620 ptlrpc_request_free(req);
624 /* overload the size and blocks fields in the oa with start/end */
625 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
627 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
628 body->oa.o_size = start;
629 body->oa.o_blocks = end;
630 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
631 osc_pack_capa(req, body, oinfo->oi_capa);
633 ptlrpc_request_set_replen(req);
634 req->rq_interpret_reply = osc_sync_interpret;
636 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
637 aa = ptlrpc_req_async_args(req);
640 ptlrpc_set_add_req(set, req);
644 /* Find and cancel locally locks matched by @mode in the resource found by
645 * @objid. Found locks are added into @cancel list. Returns the amount of
646 * locks added to @cancels list. */
647 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
649 ldlm_mode_t mode, int lock_flags)
651 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
652 struct ldlm_res_id res_id;
653 struct ldlm_resource *res;
657 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
658 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
662 LDLM_RESOURCE_ADDREF(res);
663 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
664 lock_flags, 0, NULL);
665 LDLM_RESOURCE_DELREF(res);
666 ldlm_resource_putref(res);
670 static int osc_destroy_interpret(const struct lu_env *env,
671 struct ptlrpc_request *req, void *data,
674 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
676 cfs_atomic_dec(&cli->cl_destroy_in_flight);
677 cfs_waitq_signal(&cli->cl_destroy_waitq);
681 static int osc_can_send_destroy(struct client_obd *cli)
683 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
684 cli->cl_max_rpcs_in_flight) {
685 /* The destroy request can be sent */
688 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
689 cli->cl_max_rpcs_in_flight) {
691 * The counter has been modified between the two atomic
694 cfs_waitq_signal(&cli->cl_destroy_waitq);
699 /* Destroy requests can be async always on the client, and we don't even really
700 * care about the return code since the client cannot do anything at all about
702 * When the MDS is unlinking a filename, it saves the file objects into a
703 * recovery llog, and these object records are cancelled when the OST reports
704 * they were destroyed and sync'd to disk (i.e. transaction committed).
705 * If the client dies, or the OST is down when the object should be destroyed,
706 * the records are not cancelled, and when the OST reconnects to the MDS next,
707 * it will retrieve the llog unlink logs and then sends the log cancellation
708 * cookies to the MDS after committing destroy transactions. */
709 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
710 struct lov_stripe_md *ea, struct obd_trans_info *oti,
711 struct obd_export *md_export, void *capa)
713 struct client_obd *cli = &exp->exp_obd->u.cli;
714 struct ptlrpc_request *req;
715 struct ost_body *body;
716 CFS_LIST_HEAD(cancels);
721 CDEBUG(D_INFO, "oa NULL\n");
725 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
726 LDLM_FL_DISCARD_DATA);
728 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
730 ldlm_lock_list_put(&cancels, l_bl_ast, count);
734 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
735 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
738 ptlrpc_request_free(req);
742 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
743 ptlrpc_at_set_req_timeout(req);
745 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
746 oa->o_lcookie = *oti->oti_logcookies;
747 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
749 lustre_set_wire_obdo(&body->oa, oa);
751 osc_pack_capa(req, body, (struct obd_capa *)capa);
752 ptlrpc_request_set_replen(req);
754 /* don't throttle destroy RPCs for the MDT */
755 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
756 req->rq_interpret_reply = osc_destroy_interpret;
757 if (!osc_can_send_destroy(cli)) {
758 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
762 * Wait until the number of on-going destroy RPCs drops
763 * under max_rpc_in_flight
765 l_wait_event_exclusive(cli->cl_destroy_waitq,
766 osc_can_send_destroy(cli), &lwi);
770 /* Do not wait for response */
771 ptlrpcd_add_req(req, PSCOPE_OTHER);
775 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
778 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
780 LASSERT(!(oa->o_valid & bits));
783 client_obd_list_lock(&cli->cl_loi_list_lock);
784 oa->o_dirty = cli->cl_dirty;
785 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
786 CERROR("dirty %lu - %lu > dirty_max %lu\n",
787 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
789 } else if (cfs_atomic_read(&obd_dirty_pages) -
790 cfs_atomic_read(&obd_dirty_transit_pages) >
791 obd_max_dirty_pages + 1){
792 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
793 * not covered by a lock thus they may safely race and trip
794 * this CERROR() unless we add in a small fudge factor (+1). */
795 CERROR("dirty %d - %d > system dirty_max %d\n",
796 cfs_atomic_read(&obd_dirty_pages),
797 cfs_atomic_read(&obd_dirty_transit_pages),
798 obd_max_dirty_pages);
800 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
801 CERROR("dirty %lu - dirty_max %lu too big???\n",
802 cli->cl_dirty, cli->cl_dirty_max);
805 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
806 (cli->cl_max_rpcs_in_flight + 1);
807 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
809 oa->o_grant = cli->cl_avail_grant;
810 oa->o_dropped = cli->cl_lost_grant;
811 cli->cl_lost_grant = 0;
812 client_obd_list_unlock(&cli->cl_loi_list_lock);
813 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
814 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
818 static void osc_update_next_shrink(struct client_obd *cli)
820 cli->cl_next_shrink_grant =
821 cfs_time_shift(cli->cl_grant_shrink_interval);
822 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
823 cli->cl_next_shrink_grant);
826 /* caller must hold loi_list_lock */
827 static void osc_consume_write_grant(struct client_obd *cli,
828 struct brw_page *pga)
830 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
831 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
832 cfs_atomic_inc(&obd_dirty_pages);
833 cli->cl_dirty += CFS_PAGE_SIZE;
834 cli->cl_avail_grant -= CFS_PAGE_SIZE;
835 pga->flag |= OBD_BRW_FROM_GRANT;
836 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
837 CFS_PAGE_SIZE, pga, pga->pg);
838 LASSERT(cli->cl_avail_grant >= 0);
839 osc_update_next_shrink(cli);
842 /* the companion to osc_consume_write_grant, called when a brw has completed.
843 * must be called with the loi lock held. */
844 static void osc_release_write_grant(struct client_obd *cli,
845 struct brw_page *pga, int sent)
847 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
850 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
851 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
856 pga->flag &= ~OBD_BRW_FROM_GRANT;
857 cfs_atomic_dec(&obd_dirty_pages);
858 cli->cl_dirty -= CFS_PAGE_SIZE;
859 if (pga->flag & OBD_BRW_NOCACHE) {
860 pga->flag &= ~OBD_BRW_NOCACHE;
861 cfs_atomic_dec(&obd_dirty_transit_pages);
862 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
865 cli->cl_lost_grant += CFS_PAGE_SIZE;
866 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
867 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
868 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
869 /* For short writes we shouldn't count parts of pages that
870 * span a whole block on the OST side, or our accounting goes
871 * wrong. Should match the code in filter_grant_check. */
872 int offset = pga->off & ~CFS_PAGE_MASK;
873 int count = pga->count + (offset & (blocksize - 1));
874 int end = (offset + pga->count) & (blocksize - 1);
876 count += blocksize - end;
878 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
879 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
880 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
881 cli->cl_avail_grant, cli->cl_dirty);
887 static unsigned long rpcs_in_flight(struct client_obd *cli)
889 return cli->cl_r_in_flight + cli->cl_w_in_flight;
892 /* caller must hold loi_list_lock */
893 void osc_wake_cache_waiters(struct client_obd *cli)
896 struct osc_cache_waiter *ocw;
899 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
900 /* if we can't dirty more, we must wait until some is written */
901 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
902 (cfs_atomic_read(&obd_dirty_pages) + 1 >
903 obd_max_dirty_pages)) {
904 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
905 "osc max %ld, sys max %d\n", cli->cl_dirty,
906 cli->cl_dirty_max, obd_max_dirty_pages);
910 /* if still dirty cache but no grant wait for pending RPCs that
911 * may yet return us some grant before doing sync writes */
912 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
913 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
914 cli->cl_w_in_flight);
918 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
919 cfs_list_del_init(&ocw->ocw_entry);
920 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
921 /* no more RPCs in flight to return grant, do sync IO */
922 ocw->ocw_rc = -EDQUOT;
923 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
925 osc_consume_write_grant(cli,
926 &ocw->ocw_oap->oap_brw_page);
929 cfs_waitq_signal(&ocw->ocw_waitq);
935 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
937 client_obd_list_lock(&cli->cl_loi_list_lock);
938 cli->cl_avail_grant += grant;
939 client_obd_list_unlock(&cli->cl_loi_list_lock);
942 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
944 if (body->oa.o_valid & OBD_MD_FLGRANT) {
945 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
946 __osc_update_grant(cli, body->oa.o_grant);
950 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
951 void *key, obd_count vallen, void *val,
952 struct ptlrpc_request_set *set);
954 static int osc_shrink_grant_interpret(const struct lu_env *env,
955 struct ptlrpc_request *req,
958 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
959 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
960 struct ost_body *body;
963 __osc_update_grant(cli, oa->o_grant);
967 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
969 osc_update_grant(cli, body);
975 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
977 client_obd_list_lock(&cli->cl_loi_list_lock);
978 oa->o_grant = cli->cl_avail_grant / 4;
979 cli->cl_avail_grant -= oa->o_grant;
980 client_obd_list_unlock(&cli->cl_loi_list_lock);
981 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
982 oa->o_valid |= OBD_MD_FLFLAGS;
985 oa->o_flags |= OBD_FL_SHRINK_GRANT;
986 osc_update_next_shrink(cli);
989 /* Shrink the current grant, either from some large amount to enough for a
990 * full set of in-flight RPCs, or if we have already shrunk to that limit
991 * then to enough for a single RPC. This avoids keeping more grant than
992 * needed, and avoids shrinking the grant piecemeal. */
993 static int osc_shrink_grant(struct client_obd *cli)
995 long target = (cli->cl_max_rpcs_in_flight + 1) *
996 cli->cl_max_pages_per_rpc;
998 client_obd_list_lock(&cli->cl_loi_list_lock);
999 if (cli->cl_avail_grant <= target)
1000 target = cli->cl_max_pages_per_rpc;
1001 client_obd_list_unlock(&cli->cl_loi_list_lock);
1003 return osc_shrink_grant_to_target(cli, target);
1006 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1009 struct ost_body *body;
1012 client_obd_list_lock(&cli->cl_loi_list_lock);
1013 /* Don't shrink if we are already above or below the desired limit
1014 * We don't want to shrink below a single RPC, as that will negatively
1015 * impact block allocation and long-term performance. */
1016 if (target < cli->cl_max_pages_per_rpc)
1017 target = cli->cl_max_pages_per_rpc;
1019 if (target >= cli->cl_avail_grant) {
1020 client_obd_list_unlock(&cli->cl_loi_list_lock);
1023 client_obd_list_unlock(&cli->cl_loi_list_lock);
1025 OBD_ALLOC_PTR(body);
1029 osc_announce_cached(cli, &body->oa, 0);
1031 client_obd_list_lock(&cli->cl_loi_list_lock);
1032 body->oa.o_grant = cli->cl_avail_grant - target;
1033 cli->cl_avail_grant = target;
1034 client_obd_list_unlock(&cli->cl_loi_list_lock);
1035 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1036 body->oa.o_valid |= OBD_MD_FLFLAGS;
1037 body->oa.o_flags = 0;
1039 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1040 osc_update_next_shrink(cli);
1042 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1043 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1044 sizeof(*body), body, NULL);
1046 __osc_update_grant(cli, body->oa.o_grant);
1051 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1052 static int osc_should_shrink_grant(struct client_obd *client)
1054 cfs_time_t time = cfs_time_current();
1055 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1057 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1058 OBD_CONNECT_GRANT_SHRINK) == 0)
1061 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1062 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1063 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1066 osc_update_next_shrink(client);
1071 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1073 struct client_obd *client;
1075 cfs_list_for_each_entry(client, &item->ti_obd_list,
1076 cl_grant_shrink_list) {
1077 if (osc_should_shrink_grant(client))
1078 osc_shrink_grant(client);
1083 static int osc_add_shrink_grant(struct client_obd *client)
1087 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1089 osc_grant_shrink_grant_cb, NULL,
1090 &client->cl_grant_shrink_list);
1092 CERROR("add grant client %s error %d\n",
1093 client->cl_import->imp_obd->obd_name, rc);
1096 CDEBUG(D_CACHE, "add grant client %s \n",
1097 client->cl_import->imp_obd->obd_name);
1098 osc_update_next_shrink(client);
1102 static int osc_del_shrink_grant(struct client_obd *client)
1104 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1108 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1111 * ocd_grant is the total grant amount we're expect to hold: if we've
1112 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1113 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1115 * race is tolerable here: if we're evicted, but imp_state already
1116 * left EVICTED state, then cl_dirty must be 0 already.
1118 client_obd_list_lock(&cli->cl_loi_list_lock);
1119 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1120 cli->cl_avail_grant = ocd->ocd_grant;
1122 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1124 if (cli->cl_avail_grant < 0) {
1125 CWARN("%s: available grant < 0, the OSS is probably not running"
1126 " with patch from bug20278 (%ld) \n",
1127 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1128 /* workaround for 1.6 servers which do not have
1129 * the patch from bug20278 */
1130 cli->cl_avail_grant = ocd->ocd_grant;
1133 client_obd_list_unlock(&cli->cl_loi_list_lock);
1135 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1136 cli->cl_import->imp_obd->obd_name,
1137 cli->cl_avail_grant, cli->cl_lost_grant);
1139 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1140 cfs_list_empty(&cli->cl_grant_shrink_list))
1141 osc_add_shrink_grant(cli);
1144 /* We assume that the reason this OSC got a short read is because it read
1145 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1146 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1147 * this stripe never got written at or beyond this stripe offset yet. */
1148 static void handle_short_read(int nob_read, obd_count page_count,
1149 struct brw_page **pga)
1154 /* skip bytes read OK */
1155 while (nob_read > 0) {
1156 LASSERT (page_count > 0);
1158 if (pga[i]->count > nob_read) {
1159 /* EOF inside this page */
1160 ptr = cfs_kmap(pga[i]->pg) +
1161 (pga[i]->off & ~CFS_PAGE_MASK);
1162 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1163 cfs_kunmap(pga[i]->pg);
1169 nob_read -= pga[i]->count;
1174 /* zero remaining pages */
1175 while (page_count-- > 0) {
1176 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1177 memset(ptr, 0, pga[i]->count);
1178 cfs_kunmap(pga[i]->pg);
1183 static int check_write_rcs(struct ptlrpc_request *req,
1184 int requested_nob, int niocount,
1185 obd_count page_count, struct brw_page **pga)
1190 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1191 sizeof(*remote_rcs) *
1193 if (remote_rcs == NULL) {
1194 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1198 /* return error if any niobuf was in error */
1199 for (i = 0; i < niocount; i++) {
1200 if ((int)remote_rcs[i] < 0)
1201 return(remote_rcs[i]);
1203 if (remote_rcs[i] != 0) {
1204 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1205 i, remote_rcs[i], req);
1210 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1211 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1212 req->rq_bulk->bd_nob_transferred, requested_nob);
1219 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1221 if (p1->flag != p2->flag) {
1222 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1223 OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1225 /* warn if we try to combine flags that we don't know to be
1226 * safe to combine */
1227 if ((p1->flag & mask) != (p2->flag & mask))
1228 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1229 "same brw?\n", p1->flag, p2->flag);
1233 return (p1->off + p1->count == p2->off);
1236 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1237 struct brw_page **pga, int opc,
1238 cksum_type_t cksum_type)
1243 LASSERT (pg_count > 0);
1244 cksum = init_checksum(cksum_type);
1245 while (nob > 0 && pg_count > 0) {
1246 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1247 int off = pga[i]->off & ~CFS_PAGE_MASK;
1248 int count = pga[i]->count > nob ? nob : pga[i]->count;
1250 /* corrupt the data before we compute the checksum, to
1251 * simulate an OST->client data error */
1252 if (i == 0 && opc == OST_READ &&
1253 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1254 memcpy(ptr + off, "bad1", min(4, nob));
1255 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1256 cfs_kunmap(pga[i]->pg);
1257 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1260 nob -= pga[i]->count;
1264 /* For sending we only compute the wrong checksum instead
1265 * of corrupting the data so it is still correct on a redo */
1266 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1272 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1273 struct lov_stripe_md *lsm, obd_count page_count,
1274 struct brw_page **pga,
1275 struct ptlrpc_request **reqp,
1276 struct obd_capa *ocapa, int reserve,
1279 struct ptlrpc_request *req;
1280 struct ptlrpc_bulk_desc *desc;
1281 struct ost_body *body;
1282 struct obd_ioobj *ioobj;
1283 struct niobuf_remote *niobuf;
1284 int niocount, i, requested_nob, opc, rc;
1285 struct osc_brw_async_args *aa;
1286 struct req_capsule *pill;
1287 struct brw_page *pg_prev;
1290 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1291 RETURN(-ENOMEM); /* Recoverable */
1292 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1293 RETURN(-EINVAL); /* Fatal */
1295 if ((cmd & OBD_BRW_WRITE) != 0) {
1297 req = ptlrpc_request_alloc_pool(cli->cl_import,
1298 cli->cl_import->imp_rq_pool,
1299 &RQF_OST_BRW_WRITE);
1302 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1307 for (niocount = i = 1; i < page_count; i++) {
1308 if (!can_merge_pages(pga[i - 1], pga[i]))
1312 pill = &req->rq_pill;
1313 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1315 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1316 niocount * sizeof(*niobuf));
1317 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1319 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1321 ptlrpc_request_free(req);
1324 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1325 ptlrpc_at_set_req_timeout(req);
1327 if (opc == OST_WRITE)
1328 desc = ptlrpc_prep_bulk_imp(req, page_count,
1329 BULK_GET_SOURCE, OST_BULK_PORTAL);
1331 desc = ptlrpc_prep_bulk_imp(req, page_count,
1332 BULK_PUT_SINK, OST_BULK_PORTAL);
1335 GOTO(out, rc = -ENOMEM);
1336 /* NB request now owns desc and will free it when it gets freed */
1338 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1339 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1340 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1341 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1343 lustre_set_wire_obdo(&body->oa, oa);
1345 obdo_to_ioobj(oa, ioobj);
1346 ioobj->ioo_bufcnt = niocount;
1347 osc_pack_capa(req, body, ocapa);
1348 LASSERT (page_count > 0);
1350 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1351 struct brw_page *pg = pga[i];
1352 int poff = pg->off & ~CFS_PAGE_MASK;
1354 LASSERT(pg->count > 0);
1355 /* make sure there is no gap in the middle of page array */
1356 LASSERTF(page_count == 1 ||
1357 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1358 ergo(i > 0 && i < page_count - 1,
1359 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1360 ergo(i == page_count - 1, poff == 0)),
1361 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1362 i, page_count, pg, pg->off, pg->count);
1364 LASSERTF(i == 0 || pg->off > pg_prev->off,
1365 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1366 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1368 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1369 pg_prev->pg, page_private(pg_prev->pg),
1370 pg_prev->pg->index, pg_prev->off);
1372 LASSERTF(i == 0 || pg->off > pg_prev->off,
1373 "i %d p_c %u\n", i, page_count);
1375 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1376 (pg->flag & OBD_BRW_SRVLOCK));
1378 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1379 requested_nob += pg->count;
1381 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1383 niobuf->len += pg->count;
1385 niobuf->offset = pg->off;
1386 niobuf->len = pg->count;
1387 niobuf->flags = pg->flag;
1392 LASSERTF((void *)(niobuf - niocount) ==
1393 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1394 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1395 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1397 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1399 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1400 body->oa.o_valid |= OBD_MD_FLFLAGS;
1401 body->oa.o_flags = 0;
1403 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1406 if (osc_should_shrink_grant(cli))
1407 osc_shrink_grant_local(cli, &body->oa);
1409 /* size[REQ_REC_OFF] still sizeof (*body) */
1410 if (opc == OST_WRITE) {
1411 if (unlikely(cli->cl_checksum) &&
1412 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1413 /* store cl_cksum_type in a local variable since
1414 * it can be changed via lprocfs */
1415 cksum_type_t cksum_type = cli->cl_cksum_type;
1417 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1418 oa->o_flags &= OBD_FL_LOCAL_MASK;
1419 body->oa.o_flags = 0;
1421 body->oa.o_flags |= cksum_type_pack(cksum_type);
1422 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1423 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1427 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1429 /* save this in 'oa', too, for later checking */
1430 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431 oa->o_flags |= cksum_type_pack(cksum_type);
1433 /* clear out the checksum flag, in case this is a
1434 * resend but cl_checksum is no longer set. b=11238 */
1435 oa->o_valid &= ~OBD_MD_FLCKSUM;
1437 oa->o_cksum = body->oa.o_cksum;
1438 /* 1 RC per niobuf */
1439 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1440 sizeof(__u32) * niocount);
1442 if (unlikely(cli->cl_checksum) &&
1443 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1444 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1445 body->oa.o_flags = 0;
1446 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1447 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1450 ptlrpc_request_set_replen(req);
1452 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1453 aa = ptlrpc_req_async_args(req);
1455 aa->aa_requested_nob = requested_nob;
1456 aa->aa_nio_count = niocount;
1457 aa->aa_page_count = page_count;
1461 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1462 if (ocapa && reserve)
1463 aa->aa_ocapa = capa_get(ocapa);
1469 ptlrpc_req_finished(req);
1473 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1474 __u32 client_cksum, __u32 server_cksum, int nob,
1475 obd_count page_count, struct brw_page **pga,
1476 cksum_type_t client_cksum_type)
1480 cksum_type_t cksum_type;
1482 if (server_cksum == client_cksum) {
1483 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1487 /* If this is mmaped file - it can be changed at any time */
1488 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1491 if (oa->o_valid & OBD_MD_FLFLAGS)
1492 cksum_type = cksum_type_unpack(oa->o_flags);
1494 cksum_type = OBD_CKSUM_CRC32;
1496 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1499 if (cksum_type != client_cksum_type)
1500 msg = "the server did not use the checksum type specified in "
1501 "the original request - likely a protocol problem";
1502 else if (new_cksum == server_cksum)
1503 msg = "changed on the client after we checksummed it - "
1504 "likely false positive due to mmap IO (bug 11742)";
1505 else if (new_cksum == client_cksum)
1506 msg = "changed in transit before arrival at OST";
1508 msg = "changed in transit AND doesn't match the original - "
1509 "likely false positive due to mmap IO (bug 11742)";
1511 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1512 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1513 msg, libcfs_nid2str(peer->nid),
1514 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1515 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1516 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1518 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1520 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1521 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1522 "client csum now %x\n", client_cksum, client_cksum_type,
1523 server_cksum, cksum_type, new_cksum);
1527 /* Note rc enters this function as number of bytes transferred */
1528 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1530 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1531 const lnet_process_id_t *peer =
1532 &req->rq_import->imp_connection->c_peer;
1533 struct client_obd *cli = aa->aa_cli;
1534 struct ost_body *body;
1535 __u32 client_cksum = 0;
1538 if (rc < 0 && rc != -EDQUOT) {
1539 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1543 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1544 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1546 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1550 #ifdef HAVE_QUOTA_SUPPORT
1551 /* set/clear over quota flag for a uid/gid */
1552 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1553 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1554 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1556 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1557 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1559 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1564 osc_update_grant(cli, body);
1569 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1570 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1572 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1574 CERROR("Unexpected +ve rc %d\n", rc);
1577 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1579 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1582 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1583 check_write_checksum(&body->oa, peer, client_cksum,
1584 body->oa.o_cksum, aa->aa_requested_nob,
1585 aa->aa_page_count, aa->aa_ppga,
1586 cksum_type_unpack(aa->aa_oa->o_flags)))
1589 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1590 aa->aa_page_count, aa->aa_ppga);
1594 /* The rest of this function executes only for OST_READs */
1596 /* if unwrap_bulk failed, return -EAGAIN to retry */
1597 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1599 GOTO(out, rc = -EAGAIN);
1601 if (rc > aa->aa_requested_nob) {
1602 CERROR("Unexpected rc %d (%d requested)\n", rc,
1603 aa->aa_requested_nob);
1607 if (rc != req->rq_bulk->bd_nob_transferred) {
1608 CERROR ("Unexpected rc %d (%d transferred)\n",
1609 rc, req->rq_bulk->bd_nob_transferred);
1613 if (rc < aa->aa_requested_nob)
1614 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1616 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1617 static int cksum_counter;
1618 __u32 server_cksum = body->oa.o_cksum;
1621 cksum_type_t cksum_type;
1623 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1624 cksum_type = cksum_type_unpack(body->oa.o_flags);
1626 cksum_type = OBD_CKSUM_CRC32;
1627 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1628 aa->aa_ppga, OST_READ,
1631 if (peer->nid == req->rq_bulk->bd_sender) {
1635 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1638 if (server_cksum == ~0 && rc > 0) {
1639 CERROR("Protocol error: server %s set the 'checksum' "
1640 "bit, but didn't send a checksum. Not fatal, "
1641 "but please notify on http://bugs.whamcloud.com/\n",
1642 libcfs_nid2str(peer->nid));
1643 } else if (server_cksum != client_cksum) {
1644 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1645 "%s%s%s inode "DFID" object "
1646 LPU64"/"LPU64" extent "
1647 "["LPU64"-"LPU64"]\n",
1648 req->rq_import->imp_obd->obd_name,
1649 libcfs_nid2str(peer->nid),
1651 body->oa.o_valid & OBD_MD_FLFID ?
1652 body->oa.o_parent_seq : (__u64)0,
1653 body->oa.o_valid & OBD_MD_FLFID ?
1654 body->oa.o_parent_oid : 0,
1655 body->oa.o_valid & OBD_MD_FLFID ?
1656 body->oa.o_parent_ver : 0,
1658 body->oa.o_valid & OBD_MD_FLGROUP ?
1659 body->oa.o_seq : (__u64)0,
1660 aa->aa_ppga[0]->off,
1661 aa->aa_ppga[aa->aa_page_count-1]->off +
1662 aa->aa_ppga[aa->aa_page_count-1]->count -
1664 CERROR("client %x, server %x, cksum_type %x\n",
1665 client_cksum, server_cksum, cksum_type);
1667 aa->aa_oa->o_cksum = client_cksum;
1671 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1674 } else if (unlikely(client_cksum)) {
1675 static int cksum_missed;
1678 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1679 CERROR("Checksum %u requested from %s but not sent\n",
1680 cksum_missed, libcfs_nid2str(peer->nid));
1686 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1691 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1692 struct lov_stripe_md *lsm,
1693 obd_count page_count, struct brw_page **pga,
1694 struct obd_capa *ocapa)
1696 struct ptlrpc_request *req;
1700 struct l_wait_info lwi;
1704 cfs_waitq_init(&waitq);
1707 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1708 page_count, pga, &req, ocapa, 0, resends);
1712 rc = ptlrpc_queue_wait(req);
1714 if (rc == -ETIMEDOUT && req->rq_resend) {
1715 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1716 ptlrpc_req_finished(req);
1720 rc = osc_brw_fini_request(req, rc);
1722 ptlrpc_req_finished(req);
1723 if (osc_recoverable_error(rc)) {
1725 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1726 CERROR("too many resend retries, returning error\n");
1730 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1731 l_wait_event(waitq, 0, &lwi);
1739 int osc_brw_redo_request(struct ptlrpc_request *request,
1740 struct osc_brw_async_args *aa)
1742 struct ptlrpc_request *new_req;
1743 struct ptlrpc_request_set *set = request->rq_set;
1744 struct osc_brw_async_args *new_aa;
1745 struct osc_async_page *oap;
1749 if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1750 CERROR("too many resent retries, returning error\n");
1754 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1756 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1757 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1758 aa->aa_cli, aa->aa_oa,
1759 NULL /* lsm unused by osc currently */,
1760 aa->aa_page_count, aa->aa_ppga,
1761 &new_req, aa->aa_ocapa, 0, 1);
1765 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1767 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1768 if (oap->oap_request != NULL) {
1769 LASSERTF(request == oap->oap_request,
1770 "request %p != oap_request %p\n",
1771 request, oap->oap_request);
1772 if (oap->oap_interrupted) {
1773 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1774 ptlrpc_req_finished(new_req);
1779 /* New request takes over pga and oaps from old request.
1780 * Note that copying a list_head doesn't work, need to move it... */
1782 new_req->rq_interpret_reply = request->rq_interpret_reply;
1783 new_req->rq_async_args = request->rq_async_args;
1784 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1786 new_aa = ptlrpc_req_async_args(new_req);
1788 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1789 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1790 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1792 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1793 if (oap->oap_request) {
1794 ptlrpc_req_finished(oap->oap_request);
1795 oap->oap_request = ptlrpc_request_addref(new_req);
1799 new_aa->aa_ocapa = aa->aa_ocapa;
1800 aa->aa_ocapa = NULL;
1802 /* use ptlrpc_set_add_req is safe because interpret functions work
1803 * in check_set context. only one way exist with access to request
1804 * from different thread got -EINTR - this way protected with
1805 * cl_loi_list_lock */
1806 ptlrpc_set_add_req(set, new_req);
1808 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1810 DEBUG_REQ(D_INFO, new_req, "new request");
1815 * ugh, we want disk allocation on the target to happen in offset order. we'll
1816 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1817 * fine for our small page arrays and doesn't require allocation. its an
1818 * insertion sort that swaps elements that are strides apart, shrinking the
1819 * stride down until its '1' and the array is sorted.
1821 static void sort_brw_pages(struct brw_page **array, int num)
1824 struct brw_page *tmp;
1828 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1833 for (i = stride ; i < num ; i++) {
1836 while (j >= stride && array[j - stride]->off > tmp->off) {
1837 array[j] = array[j - stride];
1842 } while (stride > 1);
1845 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1851 LASSERT (pages > 0);
1852 offset = pg[i]->off & ~CFS_PAGE_MASK;
1856 if (pages == 0) /* that's all */
1859 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1860 return count; /* doesn't end on page boundary */
1863 offset = pg[i]->off & ~CFS_PAGE_MASK;
1864 if (offset != 0) /* doesn't start on page boundary */
1871 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1873 struct brw_page **ppga;
1876 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1880 for (i = 0; i < count; i++)
1885 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1887 LASSERT(ppga != NULL);
1888 OBD_FREE(ppga, sizeof(*ppga) * count);
1891 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1892 obd_count page_count, struct brw_page *pga,
1893 struct obd_trans_info *oti)
1895 struct obdo *saved_oa = NULL;
1896 struct brw_page **ppga, **orig;
1897 struct obd_import *imp = class_exp2cliimp(exp);
1898 struct client_obd *cli;
1899 int rc, page_count_orig;
1902 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1903 cli = &imp->imp_obd->u.cli;
1905 if (cmd & OBD_BRW_CHECK) {
1906 /* The caller just wants to know if there's a chance that this
1907 * I/O can succeed */
1909 if (imp->imp_invalid)
1914 /* test_brw with a failed create can trip this, maybe others. */
1915 LASSERT(cli->cl_max_pages_per_rpc);
1919 orig = ppga = osc_build_ppga(pga, page_count);
1922 page_count_orig = page_count;
1924 sort_brw_pages(ppga, page_count);
1925 while (page_count) {
1926 obd_count pages_per_brw;
1928 if (page_count > cli->cl_max_pages_per_rpc)
1929 pages_per_brw = cli->cl_max_pages_per_rpc;
1931 pages_per_brw = page_count;
1933 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1935 if (saved_oa != NULL) {
1936 /* restore previously saved oa */
1937 *oinfo->oi_oa = *saved_oa;
1938 } else if (page_count > pages_per_brw) {
1939 /* save a copy of oa (brw will clobber it) */
1940 OBDO_ALLOC(saved_oa);
1941 if (saved_oa == NULL)
1942 GOTO(out, rc = -ENOMEM);
1943 *saved_oa = *oinfo->oi_oa;
1946 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1947 pages_per_brw, ppga, oinfo->oi_capa);
1952 page_count -= pages_per_brw;
1953 ppga += pages_per_brw;
1957 osc_release_ppga(orig, page_count_orig);
1959 if (saved_oa != NULL)
1960 OBDO_FREE(saved_oa);
1965 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1966 * the dirty accounting. Writeback completes or truncate happens before
1967 * writing starts. Must be called with the loi lock held. */
1968 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1971 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1975 /* This maintains the lists of pending pages to read/write for a given object
1976 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1977 * to quickly find objects that are ready to send an RPC. */
1978 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1984 if (lop->lop_num_pending == 0)
1987 /* if we have an invalid import we want to drain the queued pages
1988 * by forcing them through rpcs that immediately fail and complete
1989 * the pages. recovery relies on this to empty the queued pages
1990 * before canceling the locks and evicting down the llite pages */
1991 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1994 /* stream rpcs in queue order as long as as there is an urgent page
1995 * queued. this is our cheap solution for good batching in the case
1996 * where writepage marks some random page in the middle of the file
1997 * as urgent because of, say, memory pressure */
1998 if (!cfs_list_empty(&lop->lop_urgent)) {
1999 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2002 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2003 optimal = cli->cl_max_pages_per_rpc;
2004 if (cmd & OBD_BRW_WRITE) {
2005 /* trigger a write rpc stream as long as there are dirtiers
2006 * waiting for space. as they're waiting, they're not going to
2007 * create more pages to coalesce with what's waiting.. */
2008 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2009 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2012 /* +16 to avoid triggering rpcs that would want to include pages
2013 * that are being queued but which can't be made ready until
2014 * the queuer finishes with the page. this is a wart for
2015 * llite::commit_write() */
2018 if (lop->lop_num_pending >= optimal)
2024 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2026 struct osc_async_page *oap;
2029 if (cfs_list_empty(&lop->lop_urgent))
2032 oap = cfs_list_entry(lop->lop_urgent.next,
2033 struct osc_async_page, oap_urgent_item);
2035 if (oap->oap_async_flags & ASYNC_HP) {
2036 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2043 static void on_list(cfs_list_t *item, cfs_list_t *list,
2046 if (cfs_list_empty(item) && should_be_on)
2047 cfs_list_add_tail(item, list);
2048 else if (!cfs_list_empty(item) && !should_be_on)
2049 cfs_list_del_init(item);
2052 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2053 * can find pages to build into rpcs quickly */
2054 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2056 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2057 lop_makes_hprpc(&loi->loi_read_lop)) {
2059 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2060 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2062 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2063 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2064 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2065 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2068 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2069 loi->loi_write_lop.lop_num_pending);
2071 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2072 loi->loi_read_lop.lop_num_pending);
2075 static void lop_update_pending(struct client_obd *cli,
2076 struct loi_oap_pages *lop, int cmd, int delta)
2078 lop->lop_num_pending += delta;
2079 if (cmd & OBD_BRW_WRITE)
2080 cli->cl_pending_w_pages += delta;
2082 cli->cl_pending_r_pages += delta;
2086 * this is called when a sync waiter receives an interruption. Its job is to
2087 * get the caller woken as soon as possible. If its page hasn't been put in an
2088 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2089 * desiring interruption which will forcefully complete the rpc once the rpc
2092 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2094 struct loi_oap_pages *lop;
2095 struct lov_oinfo *loi;
2099 LASSERT(!oap->oap_interrupted);
2100 oap->oap_interrupted = 1;
2102 /* ok, it's been put in an rpc. only one oap gets a request reference */
2103 if (oap->oap_request != NULL) {
2104 ptlrpc_mark_interrupted(oap->oap_request);
2105 ptlrpcd_wake(oap->oap_request);
2106 ptlrpc_req_finished(oap->oap_request);
2107 oap->oap_request = NULL;
2111 * page completion may be called only if ->cpo_prep() method was
2112 * executed by osc_io_submit(), that also adds page the to pending list
2114 if (!cfs_list_empty(&oap->oap_pending_item)) {
2115 cfs_list_del_init(&oap->oap_pending_item);
2116 cfs_list_del_init(&oap->oap_urgent_item);
2119 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2120 &loi->loi_write_lop : &loi->loi_read_lop;
2121 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2122 loi_list_maint(oap->oap_cli, oap->oap_loi);
2123 rc = oap->oap_caller_ops->ap_completion(env,
2124 oap->oap_caller_data,
2125 oap->oap_cmd, NULL, -EINTR);
2131 /* this is trying to propogate async writeback errors back up to the
2132 * application. As an async write fails we record the error code for later if
2133 * the app does an fsync. As long as errors persist we force future rpcs to be
2134 * sync so that the app can get a sync error and break the cycle of queueing
2135 * pages for which writeback will fail. */
2136 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2143 ar->ar_force_sync = 1;
2144 ar->ar_min_xid = ptlrpc_sample_next_xid();
2149 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2150 ar->ar_force_sync = 0;
2153 void osc_oap_to_pending(struct osc_async_page *oap)
2155 struct loi_oap_pages *lop;
2157 if (oap->oap_cmd & OBD_BRW_WRITE)
2158 lop = &oap->oap_loi->loi_write_lop;
2160 lop = &oap->oap_loi->loi_read_lop;
2162 if (oap->oap_async_flags & ASYNC_HP)
2163 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2164 else if (oap->oap_async_flags & ASYNC_URGENT)
2165 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2166 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2167 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2170 /* this must be called holding the loi list lock to give coverage to exit_cache,
2171 * async_flag maintenance, and oap_request */
2172 static void osc_ap_completion(const struct lu_env *env,
2173 struct client_obd *cli, struct obdo *oa,
2174 struct osc_async_page *oap, int sent, int rc)
2179 if (oap->oap_request != NULL) {
2180 xid = ptlrpc_req_xid(oap->oap_request);
2181 ptlrpc_req_finished(oap->oap_request);
2182 oap->oap_request = NULL;
2185 cfs_spin_lock(&oap->oap_lock);
2186 oap->oap_async_flags = 0;
2187 cfs_spin_unlock(&oap->oap_lock);
2188 oap->oap_interrupted = 0;
2190 if (oap->oap_cmd & OBD_BRW_WRITE) {
2191 osc_process_ar(&cli->cl_ar, xid, rc);
2192 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2195 if (rc == 0 && oa != NULL) {
2196 if (oa->o_valid & OBD_MD_FLBLOCKS)
2197 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2198 if (oa->o_valid & OBD_MD_FLMTIME)
2199 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2200 if (oa->o_valid & OBD_MD_FLATIME)
2201 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2202 if (oa->o_valid & OBD_MD_FLCTIME)
2203 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2206 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2207 oap->oap_cmd, oa, rc);
2209 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2210 * I/O on the page could start, but OSC calls it under lock
2211 * and thus we can add oap back to pending safely */
2213 /* upper layer wants to leave the page on pending queue */
2214 osc_oap_to_pending(oap);
2216 osc_exit_cache(cli, oap, sent);
2220 static int brw_interpret(const struct lu_env *env,
2221 struct ptlrpc_request *req, void *data, int rc)
2223 struct osc_brw_async_args *aa = data;
2224 struct client_obd *cli;
2228 rc = osc_brw_fini_request(req, rc);
2229 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2230 if (osc_recoverable_error(rc)) {
2231 /* Only retry once for mmaped files since the mmaped page
2232 * might be modified at anytime. We have to retry at least
2233 * once in case there WAS really a corruption of the page
2234 * on the network, that was not caused by mmap() modifying
2235 * the page. Bug11742 */
2236 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2237 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2238 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2241 rc = osc_brw_redo_request(req, aa);
2248 capa_put(aa->aa_ocapa);
2249 aa->aa_ocapa = NULL;
2254 client_obd_list_lock(&cli->cl_loi_list_lock);
2256 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2257 * is called so we know whether to go to sync BRWs or wait for more
2258 * RPCs to complete */
2259 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2260 cli->cl_w_in_flight--;
2262 cli->cl_r_in_flight--;
2264 async = cfs_list_empty(&aa->aa_oaps);
2265 if (!async) { /* from osc_send_oap_rpc() */
2266 struct osc_async_page *oap, *tmp;
2267 /* the caller may re-use the oap after the completion call so
2268 * we need to clean it up a little */
2269 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2271 cfs_list_del_init(&oap->oap_rpc_item);
2272 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2274 OBDO_FREE(aa->aa_oa);
2275 } else { /* from async_internal() */
2277 for (i = 0; i < aa->aa_page_count; i++)
2278 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2280 osc_wake_cache_waiters(cli);
2281 osc_check_rpcs(env, cli);
2282 client_obd_list_unlock(&cli->cl_loi_list_lock);
2284 cl_req_completion(env, aa->aa_clerq, rc);
2285 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2290 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2291 struct client_obd *cli,
2292 cfs_list_t *rpc_list,
2293 int page_count, int cmd)
2295 struct ptlrpc_request *req;
2296 struct brw_page **pga = NULL;
2297 struct osc_brw_async_args *aa;
2298 struct obdo *oa = NULL;
2299 const struct obd_async_page_ops *ops = NULL;
2300 void *caller_data = NULL;
2301 struct osc_async_page *oap;
2302 struct osc_async_page *tmp;
2303 struct ost_body *body;
2304 struct cl_req *clerq = NULL;
2305 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2306 struct ldlm_lock *lock = NULL;
2307 struct cl_req_attr crattr;
2308 int i, rc, mpflag = 0;
2311 LASSERT(!cfs_list_empty(rpc_list));
2313 if (cmd & OBD_BRW_MEMALLOC)
2314 mpflag = cfs_memory_pressure_get_and_set();
2316 memset(&crattr, 0, sizeof crattr);
2317 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2319 GOTO(out, req = ERR_PTR(-ENOMEM));
2323 GOTO(out, req = ERR_PTR(-ENOMEM));
2326 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2327 struct cl_page *page = osc_oap2cl_page(oap);
2329 ops = oap->oap_caller_ops;
2330 caller_data = oap->oap_caller_data;
2332 clerq = cl_req_alloc(env, page, crt,
2333 1 /* only 1-object rpcs for
2336 GOTO(out, req = (void *)clerq);
2337 lock = oap->oap_ldlm_lock;
2339 pga[i] = &oap->oap_brw_page;
2340 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2341 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2342 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2344 cl_req_page_add(env, clerq, page);
2347 /* always get the data for the obdo for the rpc */
2348 LASSERT(ops != NULL);
2350 crattr.cra_capa = NULL;
2351 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2353 oa->o_handle = lock->l_remote_handle;
2354 oa->o_valid |= OBD_MD_FLHANDLE;
2357 rc = cl_req_prep(env, clerq);
2359 CERROR("cl_req_prep failed: %d\n", rc);
2360 GOTO(out, req = ERR_PTR(rc));
2363 sort_brw_pages(pga, page_count);
2364 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2365 pga, &req, crattr.cra_capa, 1, 0);
2367 CERROR("prep_req failed: %d\n", rc);
2368 GOTO(out, req = ERR_PTR(rc));
2371 if (cmd & OBD_BRW_MEMALLOC)
2372 req->rq_memalloc = 1;
2374 /* Need to update the timestamps after the request is built in case
2375 * we race with setattr (locally or in queue at OST). If OST gets
2376 * later setattr before earlier BRW (as determined by the request xid),
2377 * the OST will not use BRW timestamps. Sadly, there is no obvious
2378 * way to do this in a single call. bug 10150 */
2379 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2380 cl_req_attr_set(env, clerq, &crattr,
2381 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2383 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2384 aa = ptlrpc_req_async_args(req);
2385 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2386 cfs_list_splice(rpc_list, &aa->aa_oaps);
2387 CFS_INIT_LIST_HEAD(rpc_list);
2388 aa->aa_clerq = clerq;
2390 if (cmd & OBD_BRW_MEMALLOC)
2391 cfs_memory_pressure_restore(mpflag);
2393 capa_put(crattr.cra_capa);
2398 OBD_FREE(pga, sizeof(*pga) * page_count);
2399 /* this should happen rarely and is pretty bad, it makes the
2400 * pending list not follow the dirty order */
2401 client_obd_list_lock(&cli->cl_loi_list_lock);
2402 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2403 cfs_list_del_init(&oap->oap_rpc_item);
2405 /* queued sync pages can be torn down while the pages
2406 * were between the pending list and the rpc */
2407 if (oap->oap_interrupted) {
2408 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2409 osc_ap_completion(env, cli, NULL, oap, 0,
2413 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2415 if (clerq && !IS_ERR(clerq))
2416 cl_req_completion(env, clerq, PTR_ERR(req));
2422 * prepare pages for ASYNC io and put pages in send queue.
2424 * \param cmd OBD_BRW_* macroses
2425 * \param lop pending pages
2427 * \return zero if no page added to send queue.
2428 * \return 1 if pages successfully added to send queue.
2429 * \return negative on errors.
2432 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2433 struct lov_oinfo *loi,
2434 int cmd, struct loi_oap_pages *lop)
2436 struct ptlrpc_request *req;
2437 obd_count page_count = 0;
2438 struct osc_async_page *oap = NULL, *tmp;
2439 struct osc_brw_async_args *aa;
2440 const struct obd_async_page_ops *ops;
2441 CFS_LIST_HEAD(rpc_list);
2442 int srvlock = 0, mem_tight = 0;
2443 struct cl_object *clob = NULL;
2444 obd_off starting_offset = OBD_OBJECT_EOF;
2445 unsigned int ending_offset;
2446 int starting_page_off = 0;
2449 /* ASYNC_HP pages first. At present, when the lock the pages is
2450 * to be canceled, the pages covered by the lock will be sent out
2451 * with ASYNC_HP. We have to send out them as soon as possible. */
2452 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2453 if (oap->oap_async_flags & ASYNC_HP)
2454 cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
2455 if (++page_count >= cli->cl_max_pages_per_rpc)
2460 /* first we find the pages we're allowed to work with */
2461 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2463 ops = oap->oap_caller_ops;
2465 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2466 "magic 0x%x\n", oap, oap->oap_magic);
2469 /* pin object in memory, so that completion call-backs
2470 * can be safely called under client_obd_list lock. */
2471 clob = osc_oap2cl_page(oap)->cp_obj;
2472 cl_object_get(clob);
2475 if (page_count != 0 &&
2476 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2477 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2478 " oap %p, page %p, srvlock %u\n",
2479 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2483 /* If there is a gap at the start of this page, it can't merge
2484 * with any previous page, so we'll hand the network a
2485 * "fragmented" page array that it can't transfer in 1 RDMA */
2486 if (oap->oap_obj_off < starting_offset) {
2487 if (starting_page_off != 0)
2490 starting_page_off = oap->oap_page_off;
2491 starting_offset = oap->oap_obj_off + starting_page_off;
2492 } else if (oap->oap_page_off != 0)
2495 /* in llite being 'ready' equates to the page being locked
2496 * until completion unlocks it. commit_write submits a page
2497 * as not ready because its unlock will happen unconditionally
2498 * as the call returns. if we race with commit_write giving
2499 * us that page we don't want to create a hole in the page
2500 * stream, so we stop and leave the rpc to be fired by
2501 * another dirtier or kupdated interval (the not ready page
2502 * will still be on the dirty list). we could call in
2503 * at the end of ll_file_write to process the queue again. */
2504 if (!(oap->oap_async_flags & ASYNC_READY)) {
2505 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2508 CDEBUG(D_INODE, "oap %p page %p returned %d "
2509 "instead of ready\n", oap,
2513 /* llite is telling us that the page is still
2514 * in commit_write and that we should try
2515 * and put it in an rpc again later. we
2516 * break out of the loop so we don't create
2517 * a hole in the sequence of pages in the rpc
2522 /* the io isn't needed.. tell the checks
2523 * below to complete the rpc with EINTR */
2524 cfs_spin_lock(&oap->oap_lock);
2525 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2526 cfs_spin_unlock(&oap->oap_lock);
2527 oap->oap_count = -EINTR;
2530 cfs_spin_lock(&oap->oap_lock);
2531 oap->oap_async_flags |= ASYNC_READY;
2532 cfs_spin_unlock(&oap->oap_lock);
2535 LASSERTF(0, "oap %p page %p returned %d "
2536 "from make_ready\n", oap,
2544 * Page submitted for IO has to be locked. Either by
2545 * ->ap_make_ready() or by higher layers.
2547 #if defined(__KERNEL__) && defined(__linux__)
2549 struct cl_page *page;
2551 page = osc_oap2cl_page(oap);
2553 if (page->cp_type == CPT_CACHEABLE &&
2554 !(PageLocked(oap->oap_page) &&
2555 (CheckWriteback(oap->oap_page, cmd)))) {
2556 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2558 (long)oap->oap_page->flags,
2559 oap->oap_async_flags);
2565 /* take the page out of our book-keeping */
2566 cfs_list_del_init(&oap->oap_pending_item);
2567 lop_update_pending(cli, lop, cmd, -1);
2568 cfs_list_del_init(&oap->oap_urgent_item);
2570 /* ask the caller for the size of the io as the rpc leaves. */
2571 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2573 ops->ap_refresh_count(env, oap->oap_caller_data,
2575 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2577 if (oap->oap_count <= 0) {
2578 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2580 osc_ap_completion(env, cli, NULL,
2581 oap, 0, oap->oap_count);
2585 /* now put the page back in our accounting */
2586 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2587 if (page_count++ == 0)
2588 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2590 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2593 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2594 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2595 * have the same alignment as the initial writes that allocated
2596 * extents on the server. */
2597 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2599 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2602 if (page_count >= cli->cl_max_pages_per_rpc)
2605 /* If there is a gap at the end of this page, it can't merge
2606 * with any subsequent pages, so we'll hand the network a
2607 * "fragmented" page array that it can't transfer in 1 RDMA */
2608 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2612 osc_wake_cache_waiters(cli);
2614 loi_list_maint(cli, loi);
2616 client_obd_list_unlock(&cli->cl_loi_list_lock);
2619 cl_object_put(env, clob);
2621 if (page_count == 0) {
2622 client_obd_list_lock(&cli->cl_loi_list_lock);
2626 req = osc_build_req(env, cli, &rpc_list, page_count,
2627 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2629 LASSERT(cfs_list_empty(&rpc_list));
2630 loi_list_maint(cli, loi);
2631 RETURN(PTR_ERR(req));
2634 aa = ptlrpc_req_async_args(req);
2636 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2637 if (cmd == OBD_BRW_READ) {
2638 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2639 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2640 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2641 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2643 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2644 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2645 cli->cl_w_in_flight);
2646 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2647 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2649 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2651 client_obd_list_lock(&cli->cl_loi_list_lock);
2653 if (cmd == OBD_BRW_READ)
2654 cli->cl_r_in_flight++;
2656 cli->cl_w_in_flight++;
2658 /* queued sync pages can be torn down while the pages
2659 * were between the pending list and the rpc */
2661 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2662 /* only one oap gets a request reference */
2665 if (oap->oap_interrupted && !req->rq_intr) {
2666 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2668 ptlrpc_mark_interrupted(req);
2672 tmp->oap_request = ptlrpc_request_addref(req);
2674 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2675 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2677 req->rq_interpret_reply = brw_interpret;
2678 ptlrpcd_add_req(req, PSCOPE_BRW);
2682 #define LOI_DEBUG(LOI, STR, args...) \
2683 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2684 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2685 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2686 (LOI)->loi_write_lop.lop_num_pending, \
2687 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2688 (LOI)->loi_read_lop.lop_num_pending, \
2689 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2692 /* This is called by osc_check_rpcs() to find which objects have pages that
2693 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2694 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2698 /* First return objects that have blocked locks so that they
2699 * will be flushed quickly and other clients can get the lock,
2700 * then objects which have pages ready to be stuffed into RPCs */
2701 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2702 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2703 struct lov_oinfo, loi_hp_ready_item));
2704 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2705 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2706 struct lov_oinfo, loi_ready_item));
2708 /* then if we have cache waiters, return all objects with queued
2709 * writes. This is especially important when many small files
2710 * have filled up the cache and not been fired into rpcs because
2711 * they don't pass the nr_pending/object threshhold */
2712 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2713 !cfs_list_empty(&cli->cl_loi_write_list))
2714 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2715 struct lov_oinfo, loi_write_item));
2717 /* then return all queued objects when we have an invalid import
2718 * so that they get flushed */
2719 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2720 if (!cfs_list_empty(&cli->cl_loi_write_list))
2721 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2724 if (!cfs_list_empty(&cli->cl_loi_read_list))
2725 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2726 struct lov_oinfo, loi_read_item));
2731 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2733 struct osc_async_page *oap;
2736 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2737 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2738 struct osc_async_page, oap_urgent_item);
2739 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2742 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2743 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2744 struct osc_async_page, oap_urgent_item);
2745 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2748 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2751 /* called with the loi list lock held */
2752 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2754 struct lov_oinfo *loi;
2755 int rc = 0, race_counter = 0;
2758 while ((loi = osc_next_loi(cli)) != NULL) {
2759 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2761 if (osc_max_rpc_in_flight(cli, loi))
2764 /* attempt some read/write balancing by alternating between
2765 * reads and writes in an object. The makes_rpc checks here
2766 * would be redundant if we were getting read/write work items
2767 * instead of objects. we don't want send_oap_rpc to drain a
2768 * partial read pending queue when we're given this object to
2769 * do io on writes while there are cache waiters */
2770 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2771 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2772 &loi->loi_write_lop);
2774 CERROR("Write request failed with %d\n", rc);
2776 /* osc_send_oap_rpc failed, mostly because of
2779 * It can't break here, because if:
2780 * - a page was submitted by osc_io_submit, so
2782 * - no request in flight
2783 * - no subsequent request
2784 * The system will be in live-lock state,
2785 * because there is no chance to call
2786 * osc_io_unplug() and osc_check_rpcs() any
2787 * more. pdflush can't help in this case,
2788 * because it might be blocked at grabbing
2789 * the page lock as we mentioned.
2791 * Anyway, continue to drain pages. */
2800 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2801 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2802 &loi->loi_read_lop);
2804 CERROR("Read request failed with %d\n", rc);
2812 /* attempt some inter-object balancing by issuing rpcs
2813 * for each object in turn */
2814 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2815 cfs_list_del_init(&loi->loi_hp_ready_item);
2816 if (!cfs_list_empty(&loi->loi_ready_item))
2817 cfs_list_del_init(&loi->loi_ready_item);
2818 if (!cfs_list_empty(&loi->loi_write_item))
2819 cfs_list_del_init(&loi->loi_write_item);
2820 if (!cfs_list_empty(&loi->loi_read_item))
2821 cfs_list_del_init(&loi->loi_read_item);
2823 loi_list_maint(cli, loi);
2825 /* send_oap_rpc fails with 0 when make_ready tells it to
2826 * back off. llite's make_ready does this when it tries
2827 * to lock a page queued for write that is already locked.
2828 * we want to try sending rpcs from many objects, but we
2829 * don't want to spin failing with 0. */
2830 if (race_counter == 10)
2836 /* we're trying to queue a page in the osc so we're subject to the
2837 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2838 * If the osc's queued pages are already at that limit, then we want to sleep
2839 * until there is space in the osc's queue for us. We also may be waiting for
2840 * write credits from the OST if there are RPCs in flight that may return some
2841 * before we fall back to sync writes.
2843 * We need this know our allocation was granted in the presence of signals */
2844 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2848 client_obd_list_lock(&cli->cl_loi_list_lock);
2849 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2850 client_obd_list_unlock(&cli->cl_loi_list_lock);
2855 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2858 int osc_enter_cache_try(const struct lu_env *env,
2859 struct client_obd *cli, struct lov_oinfo *loi,
2860 struct osc_async_page *oap, int transient)
2864 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2866 osc_consume_write_grant(cli, &oap->oap_brw_page);
2868 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2869 cfs_atomic_inc(&obd_dirty_transit_pages);
2870 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2876 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2877 * grant or cache space. */
2878 static int osc_enter_cache(const struct lu_env *env,
2879 struct client_obd *cli, struct lov_oinfo *loi,
2880 struct osc_async_page *oap)
2882 struct osc_cache_waiter ocw;
2883 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2887 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2888 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2889 cli->cl_dirty_max, obd_max_dirty_pages,
2890 cli->cl_lost_grant, cli->cl_avail_grant);
2892 /* force the caller to try sync io. this can jump the list
2893 * of queued writes and create a discontiguous rpc stream */
2894 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2895 cli->cl_dirty_max < CFS_PAGE_SIZE ||
2896 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2899 /* Hopefully normal case - cache space and write credits available */
2900 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2901 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2902 osc_enter_cache_try(env, cli, loi, oap, 0))
2905 /* It is safe to block as a cache waiter as long as there is grant
2906 * space available or the hope of additional grant being returned
2907 * when an in flight write completes. Using the write back cache
2908 * if possible is preferable to sending the data synchronously
2909 * because write pages can then be merged in to large requests.
2910 * The addition of this cache waiter will causing pending write
2911 * pages to be sent immediately. */
2912 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2913 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2914 cfs_waitq_init(&ocw.ocw_waitq);
2918 loi_list_maint(cli, loi);
2919 osc_check_rpcs(env, cli);
2920 client_obd_list_unlock(&cli->cl_loi_list_lock);
2922 CDEBUG(D_CACHE, "sleeping for cache space\n");
2923 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2925 client_obd_list_lock(&cli->cl_loi_list_lock);
2926 if (!cfs_list_empty(&ocw.ocw_entry)) {
2927 cfs_list_del(&ocw.ocw_entry);
2937 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2938 struct lov_oinfo *loi, cfs_page_t *page,
2939 obd_off offset, const struct obd_async_page_ops *ops,
2940 void *data, void **res, int nocache,
2941 struct lustre_handle *lockh)
2943 struct osc_async_page *oap;
2948 return cfs_size_round(sizeof(*oap));
2951 oap->oap_magic = OAP_MAGIC;
2952 oap->oap_cli = &exp->exp_obd->u.cli;
2955 oap->oap_caller_ops = ops;
2956 oap->oap_caller_data = data;
2958 oap->oap_page = page;
2959 oap->oap_obj_off = offset;
2960 if (!client_is_remote(exp) &&
2961 cfs_capable(CFS_CAP_SYS_RESOURCE))
2962 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2964 LASSERT(!(offset & ~CFS_PAGE_MASK));
2966 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2967 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2968 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2969 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2971 cfs_spin_lock_init(&oap->oap_lock);
2972 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2976 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2977 struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2978 struct osc_async_page *oap, int cmd, int off,
2979 int count, obd_flag brw_flags, enum async_flags async_flags)
2981 struct client_obd *cli = &exp->exp_obd->u.cli;
2985 if (oap->oap_magic != OAP_MAGIC)
2988 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2991 if (!cfs_list_empty(&oap->oap_pending_item) ||
2992 !cfs_list_empty(&oap->oap_urgent_item) ||
2993 !cfs_list_empty(&oap->oap_rpc_item))
2996 /* check if the file's owner/group is over quota */
2997 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2998 struct cl_object *obj;
2999 struct cl_attr attr; /* XXX put attr into thread info */
3000 unsigned int qid[MAXQUOTAS];
3002 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3004 cl_object_attr_lock(obj);
3005 rc = cl_object_attr_get(env, obj, &attr);
3006 cl_object_attr_unlock(obj);
3008 qid[USRQUOTA] = attr.cat_uid;
3009 qid[GRPQUOTA] = attr.cat_gid;
3011 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3018 loi = lsm->lsm_oinfo[0];
3020 client_obd_list_lock(&cli->cl_loi_list_lock);
3022 LASSERT(off + count <= CFS_PAGE_SIZE);
3024 oap->oap_page_off = off;
3025 oap->oap_count = count;
3026 oap->oap_brw_flags = brw_flags;
3027 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3028 if (cfs_memory_pressure_get())
3029 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3030 cfs_spin_lock(&oap->oap_lock);
3031 oap->oap_async_flags = async_flags;
3032 cfs_spin_unlock(&oap->oap_lock);
3034 if (cmd & OBD_BRW_WRITE) {
3035 rc = osc_enter_cache(env, cli, loi, oap);
3037 client_obd_list_unlock(&cli->cl_loi_list_lock);
3042 osc_oap_to_pending(oap);
3043 loi_list_maint(cli, loi);
3045 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3048 osc_check_rpcs(env, cli);
3049 client_obd_list_unlock(&cli->cl_loi_list_lock);
3054 /* aka (~was & now & flag), but this is more clear :) */
3055 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3057 int osc_set_async_flags_base(struct client_obd *cli,
3058 struct lov_oinfo *loi, struct osc_async_page *oap,
3059 obd_flag async_flags)
3061 struct loi_oap_pages *lop;
3065 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3067 if (oap->oap_cmd & OBD_BRW_WRITE) {
3068 lop = &loi->loi_write_lop;
3070 lop = &loi->loi_read_lop;
3073 if ((oap->oap_async_flags & async_flags) == async_flags)
3076 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3077 flags |= ASYNC_READY;
3079 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3080 cfs_list_empty(&oap->oap_rpc_item)) {
3081 if (oap->oap_async_flags & ASYNC_HP)
3082 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3084 cfs_list_add_tail(&oap->oap_urgent_item,
3086 flags |= ASYNC_URGENT;
3087 loi_list_maint(cli, loi);
3089 cfs_spin_lock(&oap->oap_lock);
3090 oap->oap_async_flags |= flags;
3091 cfs_spin_unlock(&oap->oap_lock);
3093 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3094 oap->oap_async_flags);
3098 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3099 struct lov_oinfo *loi, struct osc_async_page *oap)
3101 struct client_obd *cli = &exp->exp_obd->u.cli;
3102 struct loi_oap_pages *lop;
3106 if (oap->oap_magic != OAP_MAGIC)
3110 loi = lsm->lsm_oinfo[0];
3112 if (oap->oap_cmd & OBD_BRW_WRITE) {
3113 lop = &loi->loi_write_lop;
3115 lop = &loi->loi_read_lop;
3118 client_obd_list_lock(&cli->cl_loi_list_lock);
3120 if (!cfs_list_empty(&oap->oap_rpc_item))
3121 GOTO(out, rc = -EBUSY);
3123 osc_exit_cache(cli, oap, 0);
3124 osc_wake_cache_waiters(cli);
3126 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3127 cfs_list_del_init(&oap->oap_urgent_item);
3128 cfs_spin_lock(&oap->oap_lock);
3129 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3130 cfs_spin_unlock(&oap->oap_lock);
3132 if (!cfs_list_empty(&oap->oap_pending_item)) {
3133 cfs_list_del_init(&oap->oap_pending_item);
3134 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3136 loi_list_maint(cli, loi);
3137 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3139 client_obd_list_unlock(&cli->cl_loi_list_lock);
3143 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3144 struct ldlm_enqueue_info *einfo)
3146 void *data = einfo->ei_cbdata;
3149 LASSERT(lock != NULL);
3150 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3151 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3152 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3153 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3155 lock_res_and_lock(lock);
3156 cfs_spin_lock(&osc_ast_guard);
3158 if (lock->l_ast_data == NULL)
3159 lock->l_ast_data = data;
3160 if (lock->l_ast_data == data)
3163 cfs_spin_unlock(&osc_ast_guard);
3164 unlock_res_and_lock(lock);