1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 rc = sa->sa_upcall(sa->sa_cookie, rc);
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 obd_enqueue_update_f upcall, void *cookie,
370 struct ptlrpc_request_set *rqset)
372 struct ptlrpc_request *req;
373 struct osc_setattr_args *sa;
377 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 ptlrpc_request_free(req);
388 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 /* do mds to ost setattr asynchronously */
397 /* Do not wait for response. */
398 ptlrpcd_add_req(req, PSCOPE_OTHER);
400 req->rq_interpret_reply =
401 (ptlrpc_interpterer_t)osc_setattr_interpret;
403 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404 sa = ptlrpc_req_async_args(req);
405 sa->sa_oa = oinfo->oi_oa;
406 sa->sa_upcall = upcall;
407 sa->sa_cookie = cookie;
409 if (rqset == PTLRPCD_SET)
410 ptlrpcd_add_req(req, PSCOPE_OTHER);
412 ptlrpc_set_add_req(rqset, req);
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419 struct obd_trans_info *oti,
420 struct ptlrpc_request_set *rqset)
422 return osc_setattr_async_base(exp, oinfo, oti,
423 oinfo->oi_cb_up, oinfo, rqset);
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427 struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 struct ptlrpc_request *req;
430 struct ost_body *body;
431 struct lov_stripe_md *lsm;
440 rc = obd_alloc_memmd(exp, &lsm);
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447 GOTO(out, rc = -ENOMEM);
449 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451 ptlrpc_request_free(req);
455 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457 lustre_set_wire_obdo(&body->oa, oa);
459 ptlrpc_request_set_replen(req);
461 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462 oa->o_flags == OBD_FL_DELORPHAN) {
464 "delorphan from OST integration");
465 /* Don't resend the delorphan req */
466 req->rq_no_resend = req->rq_no_delay = 1;
469 rc = ptlrpc_queue_wait(req);
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 GOTO(out_req, rc = -EPROTO);
477 lustre_get_wire_obdo(oa, &body->oa);
479 /* This should really be sent by the OST */
480 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481 oa->o_valid |= OBD_MD_FLBLKSZ;
483 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484 * have valid lsm_oinfo data structs, so don't go touching that.
485 * This needs to be fixed in a big way.
487 lsm->lsm_object_id = oa->o_id;
488 lsm->lsm_object_seq = oa->o_seq;
492 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495 if (!oti->oti_logcookies)
496 oti_alloc_cookies(oti, 1);
497 *oti->oti_logcookies = oa->o_lcookie;
501 CDEBUG(D_HA, "transno: "LPD64"\n",
502 lustre_msg_get_transno(req->rq_repmsg));
504 ptlrpc_req_finished(req);
507 obd_free_memmd(exp, &lsm);
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512 obd_enqueue_update_f upcall, void *cookie,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_setattr_args *sa;
517 struct ost_body *body;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 ptlrpc_request_free(req);
531 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532 ptlrpc_at_set_req_timeout(req);
534 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537 osc_pack_capa(req, body, oinfo->oi_capa);
539 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PSCOPE_OTHER);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557 struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568 struct lov_stripe_md *md, obd_size start, obd_size end,
571 struct ptlrpc_request *req;
572 struct ost_body *body;
577 CDEBUG(D_INFO, "oa NULL\n");
581 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
585 osc_set_capa_size(req, &RMF_CAPA1, capa);
586 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588 ptlrpc_request_free(req);
592 /* overload the size and blocks fields in the oa with start/end */
593 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595 lustre_set_wire_obdo(&body->oa, oa);
596 body->oa.o_size = start;
597 body->oa.o_blocks = end;
598 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599 osc_pack_capa(req, body, capa);
601 ptlrpc_request_set_replen(req);
603 rc = ptlrpc_queue_wait(req);
607 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
609 GOTO(out, rc = -EPROTO);
611 lustre_get_wire_obdo(oa, &body->oa);
615 ptlrpc_req_finished(req);
619 /* Find and cancel locally locks matched by @mode in the resource found by
620 * @objid. Found locks are added into @cancel list. Returns the amount of
621 * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
624 ldlm_mode_t mode, int lock_flags)
626 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627 struct ldlm_res_id res_id;
628 struct ldlm_resource *res;
632 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
637 LDLM_RESOURCE_ADDREF(res);
638 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639 lock_flags, 0, NULL);
640 LDLM_RESOURCE_DELREF(res);
641 ldlm_resource_putref(res);
645 static int osc_destroy_interpret(const struct lu_env *env,
646 struct ptlrpc_request *req, void *data,
649 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
651 cfs_atomic_dec(&cli->cl_destroy_in_flight);
652 cfs_waitq_signal(&cli->cl_destroy_waitq);
656 static int osc_can_send_destroy(struct client_obd *cli)
658 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659 cli->cl_max_rpcs_in_flight) {
660 /* The destroy request can be sent */
663 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664 cli->cl_max_rpcs_in_flight) {
666 * The counter has been modified between the two atomic
669 cfs_waitq_signal(&cli->cl_destroy_waitq);
674 /* Destroy requests can be async always on the client, and we don't even really
675 * care about the return code since the client cannot do anything at all about
677 * When the MDS is unlinking a filename, it saves the file objects into a
678 * recovery llog, and these object records are cancelled when the OST reports
679 * they were destroyed and sync'd to disk (i.e. transaction committed).
680 * If the client dies, or the OST is down when the object should be destroyed,
681 * the records are not cancelled, and when the OST reconnects to the MDS next,
682 * it will retrieve the llog unlink logs and then sends the log cancellation
683 * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685 struct lov_stripe_md *ea, struct obd_trans_info *oti,
686 struct obd_export *md_export, void *capa)
688 struct client_obd *cli = &exp->exp_obd->u.cli;
689 struct ptlrpc_request *req;
690 struct ost_body *body;
691 CFS_LIST_HEAD(cancels);
696 CDEBUG(D_INFO, "oa NULL\n");
700 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701 LDLM_FL_DISCARD_DATA);
703 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
705 ldlm_lock_list_put(&cancels, l_bl_ast, count);
709 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
713 ptlrpc_request_free(req);
717 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718 ptlrpc_at_set_req_timeout(req);
720 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721 oa->o_lcookie = *oti->oti_logcookies;
722 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
724 lustre_set_wire_obdo(&body->oa, oa);
726 osc_pack_capa(req, body, (struct obd_capa *)capa);
727 ptlrpc_request_set_replen(req);
729 /* don't throttle destroy RPCs for the MDT */
730 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731 req->rq_interpret_reply = osc_destroy_interpret;
732 if (!osc_can_send_destroy(cli)) {
733 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
737 * Wait until the number of on-going destroy RPCs drops
738 * under max_rpc_in_flight
740 l_wait_event_exclusive(cli->cl_destroy_waitq,
741 osc_can_send_destroy(cli), &lwi);
745 /* Do not wait for response */
746 ptlrpcd_add_req(req, PSCOPE_OTHER);
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
753 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
755 LASSERT(!(oa->o_valid & bits));
758 client_obd_list_lock(&cli->cl_loi_list_lock);
759 oa->o_dirty = cli->cl_dirty;
760 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
764 } else if (cfs_atomic_read(&obd_dirty_pages) -
765 cfs_atomic_read(&obd_dirty_transit_pages) >
766 obd_max_dirty_pages + 1){
767 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768 * not covered by a lock thus they may safely race and trip
769 * this CERROR() unless we add in a small fudge factor (+1). */
770 CERROR("dirty %d - %d > system dirty_max %d\n",
771 cfs_atomic_read(&obd_dirty_pages),
772 cfs_atomic_read(&obd_dirty_transit_pages),
773 obd_max_dirty_pages);
775 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776 CERROR("dirty %lu - dirty_max %lu too big???\n",
777 cli->cl_dirty, cli->cl_dirty_max);
780 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781 (cli->cl_max_rpcs_in_flight + 1);
782 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
784 oa->o_grant = cli->cl_avail_grant;
785 oa->o_dropped = cli->cl_lost_grant;
786 cli->cl_lost_grant = 0;
787 client_obd_list_unlock(&cli->cl_loi_list_lock);
788 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
793 static void osc_update_next_shrink(struct client_obd *cli)
795 cli->cl_next_shrink_grant =
796 cfs_time_shift(cli->cl_grant_shrink_interval);
797 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798 cli->cl_next_shrink_grant);
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803 struct brw_page *pga)
805 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807 cfs_atomic_inc(&obd_dirty_pages);
808 cli->cl_dirty += CFS_PAGE_SIZE;
809 cli->cl_avail_grant -= CFS_PAGE_SIZE;
810 pga->flag |= OBD_BRW_FROM_GRANT;
811 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812 CFS_PAGE_SIZE, pga, pga->pg);
813 LASSERT(cli->cl_avail_grant >= 0);
814 osc_update_next_shrink(cli);
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818 * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820 struct brw_page *pga, int sent)
822 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
825 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
831 pga->flag &= ~OBD_BRW_FROM_GRANT;
832 cfs_atomic_dec(&obd_dirty_pages);
833 cli->cl_dirty -= CFS_PAGE_SIZE;
834 if (pga->flag & OBD_BRW_NOCACHE) {
835 pga->flag &= ~OBD_BRW_NOCACHE;
836 cfs_atomic_dec(&obd_dirty_transit_pages);
837 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
840 cli->cl_lost_grant += CFS_PAGE_SIZE;
841 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844 /* For short writes we shouldn't count parts of pages that
845 * span a whole block on the OST side, or our accounting goes
846 * wrong. Should match the code in filter_grant_check. */
847 int offset = pga->off & ~CFS_PAGE_MASK;
848 int count = pga->count + (offset & (blocksize - 1));
849 int end = (offset + pga->count) & (blocksize - 1);
851 count += blocksize - end;
853 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856 cli->cl_avail_grant, cli->cl_dirty);
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
864 return cli->cl_r_in_flight + cli->cl_w_in_flight;
867 int osc_wake_sync_fs(struct client_obd *cli)
870 if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
871 cli->cl_sf_wait.started) {
872 cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, 0);
873 cli->cl_sf_wait.started = 0;
878 /* caller must hold loi_list_lock */
879 void osc_wake_cache_waiters(struct client_obd *cli)
882 struct osc_cache_waiter *ocw;
885 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
886 /* if we can't dirty more, we must wait until some is written */
887 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
888 (cfs_atomic_read(&obd_dirty_pages) + 1 >
889 obd_max_dirty_pages)) {
890 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
891 "osc max %ld, sys max %d\n", cli->cl_dirty,
892 cli->cl_dirty_max, obd_max_dirty_pages);
896 /* if still dirty cache but no grant wait for pending RPCs that
897 * may yet return us some grant before doing sync writes */
898 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
899 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
900 cli->cl_w_in_flight);
904 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
905 cfs_list_del_init(&ocw->ocw_entry);
906 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
907 /* no more RPCs in flight to return grant, do sync IO */
908 ocw->ocw_rc = -EDQUOT;
909 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
911 osc_consume_write_grant(cli,
912 &ocw->ocw_oap->oap_brw_page);
915 cfs_waitq_signal(&ocw->ocw_waitq);
921 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
923 client_obd_list_lock(&cli->cl_loi_list_lock);
924 cli->cl_avail_grant += grant;
925 client_obd_list_unlock(&cli->cl_loi_list_lock);
928 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
930 if (body->oa.o_valid & OBD_MD_FLGRANT) {
931 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
932 __osc_update_grant(cli, body->oa.o_grant);
936 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
937 void *key, obd_count vallen, void *val,
938 struct ptlrpc_request_set *set);
940 static int osc_shrink_grant_interpret(const struct lu_env *env,
941 struct ptlrpc_request *req,
944 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
945 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
946 struct ost_body *body;
949 __osc_update_grant(cli, oa->o_grant);
953 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
955 osc_update_grant(cli, body);
961 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
963 client_obd_list_lock(&cli->cl_loi_list_lock);
964 oa->o_grant = cli->cl_avail_grant / 4;
965 cli->cl_avail_grant -= oa->o_grant;
966 client_obd_list_unlock(&cli->cl_loi_list_lock);
967 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
968 oa->o_valid |= OBD_MD_FLFLAGS;
971 oa->o_flags |= OBD_FL_SHRINK_GRANT;
972 osc_update_next_shrink(cli);
975 /* Shrink the current grant, either from some large amount to enough for a
976 * full set of in-flight RPCs, or if we have already shrunk to that limit
977 * then to enough for a single RPC. This avoids keeping more grant than
978 * needed, and avoids shrinking the grant piecemeal. */
979 static int osc_shrink_grant(struct client_obd *cli)
981 long target = (cli->cl_max_rpcs_in_flight + 1) *
982 cli->cl_max_pages_per_rpc;
984 client_obd_list_lock(&cli->cl_loi_list_lock);
985 if (cli->cl_avail_grant <= target)
986 target = cli->cl_max_pages_per_rpc;
987 client_obd_list_unlock(&cli->cl_loi_list_lock);
989 return osc_shrink_grant_to_target(cli, target);
992 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
995 struct ost_body *body;
998 client_obd_list_lock(&cli->cl_loi_list_lock);
999 /* Don't shrink if we are already above or below the desired limit
1000 * We don't want to shrink below a single RPC, as that will negatively
1001 * impact block allocation and long-term performance. */
1002 if (target < cli->cl_max_pages_per_rpc)
1003 target = cli->cl_max_pages_per_rpc;
1005 if (target >= cli->cl_avail_grant) {
1006 client_obd_list_unlock(&cli->cl_loi_list_lock);
1009 client_obd_list_unlock(&cli->cl_loi_list_lock);
1011 OBD_ALLOC_PTR(body);
1015 osc_announce_cached(cli, &body->oa, 0);
1017 client_obd_list_lock(&cli->cl_loi_list_lock);
1018 body->oa.o_grant = cli->cl_avail_grant - target;
1019 cli->cl_avail_grant = target;
1020 client_obd_list_unlock(&cli->cl_loi_list_lock);
1021 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1022 body->oa.o_valid |= OBD_MD_FLFLAGS;
1023 body->oa.o_flags = 0;
1025 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1026 osc_update_next_shrink(cli);
1028 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1029 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1030 sizeof(*body), body, NULL);
1032 __osc_update_grant(cli, body->oa.o_grant);
1037 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1038 static int osc_should_shrink_grant(struct client_obd *client)
1040 cfs_time_t time = cfs_time_current();
1041 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1043 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1044 OBD_CONNECT_GRANT_SHRINK) == 0)
1047 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1048 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1049 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1052 osc_update_next_shrink(client);
1057 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1059 struct client_obd *client;
1061 cfs_list_for_each_entry(client, &item->ti_obd_list,
1062 cl_grant_shrink_list) {
1063 if (osc_should_shrink_grant(client))
1064 osc_shrink_grant(client);
1069 static int osc_add_shrink_grant(struct client_obd *client)
1073 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1075 osc_grant_shrink_grant_cb, NULL,
1076 &client->cl_grant_shrink_list);
1078 CERROR("add grant client %s error %d\n",
1079 client->cl_import->imp_obd->obd_name, rc);
1082 CDEBUG(D_CACHE, "add grant client %s \n",
1083 client->cl_import->imp_obd->obd_name);
1084 osc_update_next_shrink(client);
1088 static int osc_del_shrink_grant(struct client_obd *client)
1090 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1094 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1097 * ocd_grant is the total grant amount we're expect to hold: if we've
1098 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1099 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1101 * race is tolerable here: if we're evicted, but imp_state already
1102 * left EVICTED state, then cl_dirty must be 0 already.
1104 client_obd_list_lock(&cli->cl_loi_list_lock);
1105 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1106 cli->cl_avail_grant = ocd->ocd_grant;
1108 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1110 if (cli->cl_avail_grant < 0) {
1111 CWARN("%s: available grant < 0, the OSS is probably not running"
1112 " with patch from bug20278 (%ld) \n",
1113 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1114 /* workaround for 1.6 servers which do not have
1115 * the patch from bug20278 */
1116 cli->cl_avail_grant = ocd->ocd_grant;
1119 client_obd_list_unlock(&cli->cl_loi_list_lock);
1121 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1122 cli->cl_import->imp_obd->obd_name,
1123 cli->cl_avail_grant, cli->cl_lost_grant);
1125 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1126 cfs_list_empty(&cli->cl_grant_shrink_list))
1127 osc_add_shrink_grant(cli);
1130 /* We assume that the reason this OSC got a short read is because it read
1131 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1132 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1133 * this stripe never got written at or beyond this stripe offset yet. */
1134 static void handle_short_read(int nob_read, obd_count page_count,
1135 struct brw_page **pga)
1140 /* skip bytes read OK */
1141 while (nob_read > 0) {
1142 LASSERT (page_count > 0);
1144 if (pga[i]->count > nob_read) {
1145 /* EOF inside this page */
1146 ptr = cfs_kmap(pga[i]->pg) +
1147 (pga[i]->off & ~CFS_PAGE_MASK);
1148 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1149 cfs_kunmap(pga[i]->pg);
1155 nob_read -= pga[i]->count;
1160 /* zero remaining pages */
1161 while (page_count-- > 0) {
1162 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1163 memset(ptr, 0, pga[i]->count);
1164 cfs_kunmap(pga[i]->pg);
1169 static int check_write_rcs(struct ptlrpc_request *req,
1170 int requested_nob, int niocount,
1171 obd_count page_count, struct brw_page **pga)
1176 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1177 sizeof(*remote_rcs) *
1179 if (remote_rcs == NULL) {
1180 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1184 /* return error if any niobuf was in error */
1185 for (i = 0; i < niocount; i++) {
1186 if (remote_rcs[i] < 0)
1187 return(remote_rcs[i]);
1189 if (remote_rcs[i] != 0) {
1190 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1191 i, remote_rcs[i], req);
1196 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1197 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1198 req->rq_bulk->bd_nob_transferred, requested_nob);
1205 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1207 if (p1->flag != p2->flag) {
1208 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1209 OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1211 /* warn if we try to combine flags that we don't know to be
1212 * safe to combine */
1213 if ((p1->flag & mask) != (p2->flag & mask))
1214 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1215 "same brw?\n", p1->flag, p2->flag);
1219 return (p1->off + p1->count == p2->off);
1222 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1223 struct brw_page **pga, int opc,
1224 cksum_type_t cksum_type)
1229 LASSERT (pg_count > 0);
1230 cksum = init_checksum(cksum_type);
1231 while (nob > 0 && pg_count > 0) {
1232 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1233 int off = pga[i]->off & ~CFS_PAGE_MASK;
1234 int count = pga[i]->count > nob ? nob : pga[i]->count;
1236 /* corrupt the data before we compute the checksum, to
1237 * simulate an OST->client data error */
1238 if (i == 0 && opc == OST_READ &&
1239 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1240 memcpy(ptr + off, "bad1", min(4, nob));
1241 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1242 cfs_kunmap(pga[i]->pg);
1243 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1246 nob -= pga[i]->count;
1250 /* For sending we only compute the wrong checksum instead
1251 * of corrupting the data so it is still correct on a redo */
1252 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1258 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1259 struct lov_stripe_md *lsm, obd_count page_count,
1260 struct brw_page **pga,
1261 struct ptlrpc_request **reqp,
1262 struct obd_capa *ocapa, int reserve,
1265 struct ptlrpc_request *req;
1266 struct ptlrpc_bulk_desc *desc;
1267 struct ost_body *body;
1268 struct obd_ioobj *ioobj;
1269 struct niobuf_remote *niobuf;
1270 int niocount, i, requested_nob, opc, rc;
1271 struct osc_brw_async_args *aa;
1272 struct req_capsule *pill;
1273 struct brw_page *pg_prev;
1276 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1277 RETURN(-ENOMEM); /* Recoverable */
1278 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1279 RETURN(-EINVAL); /* Fatal */
1281 if ((cmd & OBD_BRW_WRITE) != 0) {
1283 req = ptlrpc_request_alloc_pool(cli->cl_import,
1284 cli->cl_import->imp_rq_pool,
1285 &RQF_OST_BRW_WRITE);
1288 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1293 for (niocount = i = 1; i < page_count; i++) {
1294 if (!can_merge_pages(pga[i - 1], pga[i]))
1298 pill = &req->rq_pill;
1299 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1301 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1302 niocount * sizeof(*niobuf));
1303 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1305 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1307 ptlrpc_request_free(req);
1310 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1311 ptlrpc_at_set_req_timeout(req);
1313 if (opc == OST_WRITE)
1314 desc = ptlrpc_prep_bulk_imp(req, page_count,
1315 BULK_GET_SOURCE, OST_BULK_PORTAL);
1317 desc = ptlrpc_prep_bulk_imp(req, page_count,
1318 BULK_PUT_SINK, OST_BULK_PORTAL);
1321 GOTO(out, rc = -ENOMEM);
1322 /* NB request now owns desc and will free it when it gets freed */
1324 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1325 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1326 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1327 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1329 lustre_set_wire_obdo(&body->oa, oa);
1331 obdo_to_ioobj(oa, ioobj);
1332 ioobj->ioo_bufcnt = niocount;
1333 osc_pack_capa(req, body, ocapa);
1334 LASSERT (page_count > 0);
1336 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1337 struct brw_page *pg = pga[i];
1339 LASSERT(pg->count > 0);
1340 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1341 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1342 pg->off, pg->count);
1344 LASSERTF(i == 0 || pg->off > pg_prev->off,
1345 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1346 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1348 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1349 pg_prev->pg, page_private(pg_prev->pg),
1350 pg_prev->pg->index, pg_prev->off);
1352 LASSERTF(i == 0 || pg->off > pg_prev->off,
1353 "i %d p_c %u\n", i, page_count);
1355 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1356 (pg->flag & OBD_BRW_SRVLOCK));
1358 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1360 requested_nob += pg->count;
1362 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1364 niobuf->len += pg->count;
1366 niobuf->offset = pg->off;
1367 niobuf->len = pg->count;
1368 niobuf->flags = pg->flag;
1373 LASSERTF((void *)(niobuf - niocount) ==
1374 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1375 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1376 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1378 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1380 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1381 body->oa.o_valid |= OBD_MD_FLFLAGS;
1382 body->oa.o_flags = 0;
1384 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1387 if (osc_should_shrink_grant(cli))
1388 osc_shrink_grant_local(cli, &body->oa);
1390 /* size[REQ_REC_OFF] still sizeof (*body) */
1391 if (opc == OST_WRITE) {
1392 if (unlikely(cli->cl_checksum) &&
1393 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394 /* store cl_cksum_type in a local variable since
1395 * it can be changed via lprocfs */
1396 cksum_type_t cksum_type = cli->cl_cksum_type;
1398 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1399 oa->o_flags &= OBD_FL_LOCAL_MASK;
1400 body->oa.o_flags = 0;
1402 body->oa.o_flags |= cksum_type_pack(cksum_type);
1403 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1404 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1408 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1410 /* save this in 'oa', too, for later checking */
1411 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1412 oa->o_flags |= cksum_type_pack(cksum_type);
1414 /* clear out the checksum flag, in case this is a
1415 * resend but cl_checksum is no longer set. b=11238 */
1416 oa->o_valid &= ~OBD_MD_FLCKSUM;
1418 oa->o_cksum = body->oa.o_cksum;
1419 /* 1 RC per niobuf */
1420 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1421 sizeof(__u32) * niocount);
1423 if (unlikely(cli->cl_checksum) &&
1424 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1425 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1426 body->oa.o_flags = 0;
1427 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1428 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431 ptlrpc_request_set_replen(req);
1433 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1434 aa = ptlrpc_req_async_args(req);
1436 aa->aa_requested_nob = requested_nob;
1437 aa->aa_nio_count = niocount;
1438 aa->aa_page_count = page_count;
1442 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1443 if (ocapa && reserve)
1444 aa->aa_ocapa = capa_get(ocapa);
1450 ptlrpc_req_finished(req);
1454 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1455 __u32 client_cksum, __u32 server_cksum, int nob,
1456 obd_count page_count, struct brw_page **pga,
1457 cksum_type_t client_cksum_type)
1461 cksum_type_t cksum_type;
1463 if (server_cksum == client_cksum) {
1464 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1468 /* If this is mmaped file - it can be changed at any time */
1469 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1472 if (oa->o_valid & OBD_MD_FLFLAGS)
1473 cksum_type = cksum_type_unpack(oa->o_flags);
1475 cksum_type = OBD_CKSUM_CRC32;
1477 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1480 if (cksum_type != client_cksum_type)
1481 msg = "the server did not use the checksum type specified in "
1482 "the original request - likely a protocol problem";
1483 else if (new_cksum == server_cksum)
1484 msg = "changed on the client after we checksummed it - "
1485 "likely false positive due to mmap IO (bug 11742)";
1486 else if (new_cksum == client_cksum)
1487 msg = "changed in transit before arrival at OST";
1489 msg = "changed in transit AND doesn't match the original - "
1490 "likely false positive due to mmap IO (bug 11742)";
1492 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1493 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1494 msg, libcfs_nid2str(peer->nid),
1495 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1496 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1497 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1499 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1501 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1502 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1503 "client csum now %x\n", client_cksum, client_cksum_type,
1504 server_cksum, cksum_type, new_cksum);
1508 /* Note rc enters this function as number of bytes transferred */
1509 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1511 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1512 const lnet_process_id_t *peer =
1513 &req->rq_import->imp_connection->c_peer;
1514 struct client_obd *cli = aa->aa_cli;
1515 struct ost_body *body;
1516 __u32 client_cksum = 0;
1519 if (rc < 0 && rc != -EDQUOT) {
1520 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1524 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1525 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1527 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1531 #ifdef HAVE_QUOTA_SUPPORT
1532 /* set/clear over quota flag for a uid/gid */
1533 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1534 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1535 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1537 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1538 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1540 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1545 osc_update_grant(cli, body);
1550 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1551 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1553 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1555 CERROR("Unexpected +ve rc %d\n", rc);
1558 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1560 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1563 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1564 check_write_checksum(&body->oa, peer, client_cksum,
1565 body->oa.o_cksum, aa->aa_requested_nob,
1566 aa->aa_page_count, aa->aa_ppga,
1567 cksum_type_unpack(aa->aa_oa->o_flags)))
1570 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1571 aa->aa_page_count, aa->aa_ppga);
1575 /* The rest of this function executes only for OST_READs */
1577 /* if unwrap_bulk failed, return -EAGAIN to retry */
1578 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1580 GOTO(out, rc = -EAGAIN);
1582 if (rc > aa->aa_requested_nob) {
1583 CERROR("Unexpected rc %d (%d requested)\n", rc,
1584 aa->aa_requested_nob);
1588 if (rc != req->rq_bulk->bd_nob_transferred) {
1589 CERROR ("Unexpected rc %d (%d transferred)\n",
1590 rc, req->rq_bulk->bd_nob_transferred);
1594 if (rc < aa->aa_requested_nob)
1595 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1597 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1598 static int cksum_counter;
1599 __u32 server_cksum = body->oa.o_cksum;
1602 cksum_type_t cksum_type;
1604 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1605 cksum_type = cksum_type_unpack(body->oa.o_flags);
1607 cksum_type = OBD_CKSUM_CRC32;
1608 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1609 aa->aa_ppga, OST_READ,
1612 if (peer->nid == req->rq_bulk->bd_sender) {
1616 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1619 if (server_cksum == ~0 && rc > 0) {
1620 CERROR("Protocol error: server %s set the 'checksum' "
1621 "bit, but didn't send a checksum. Not fatal, "
1622 "but please notify on http://bugzilla.lustre.org/\n",
1623 libcfs_nid2str(peer->nid));
1624 } else if (server_cksum != client_cksum) {
1625 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1626 "%s%s%s inode "DFID" object "
1627 LPU64"/"LPU64" extent "
1628 "["LPU64"-"LPU64"]\n",
1629 req->rq_import->imp_obd->obd_name,
1630 libcfs_nid2str(peer->nid),
1632 body->oa.o_valid & OBD_MD_FLFID ?
1633 body->oa.o_parent_seq : (__u64)0,
1634 body->oa.o_valid & OBD_MD_FLFID ?
1635 body->oa.o_parent_oid : 0,
1636 body->oa.o_valid & OBD_MD_FLFID ?
1637 body->oa.o_parent_ver : 0,
1639 body->oa.o_valid & OBD_MD_FLGROUP ?
1640 body->oa.o_seq : (__u64)0,
1641 aa->aa_ppga[0]->off,
1642 aa->aa_ppga[aa->aa_page_count-1]->off +
1643 aa->aa_ppga[aa->aa_page_count-1]->count -
1645 CERROR("client %x, server %x, cksum_type %x\n",
1646 client_cksum, server_cksum, cksum_type);
1648 aa->aa_oa->o_cksum = client_cksum;
1652 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1655 } else if (unlikely(client_cksum)) {
1656 static int cksum_missed;
1659 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1660 CERROR("Checksum %u requested from %s but not sent\n",
1661 cksum_missed, libcfs_nid2str(peer->nid));
1667 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1672 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1673 struct lov_stripe_md *lsm,
1674 obd_count page_count, struct brw_page **pga,
1675 struct obd_capa *ocapa)
1677 struct ptlrpc_request *req;
1681 struct l_wait_info lwi;
1685 cfs_waitq_init(&waitq);
1688 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1689 page_count, pga, &req, ocapa, 0, resends);
1693 rc = ptlrpc_queue_wait(req);
1695 if (rc == -ETIMEDOUT && req->rq_resend) {
1696 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1697 ptlrpc_req_finished(req);
1701 rc = osc_brw_fini_request(req, rc);
1703 ptlrpc_req_finished(req);
1704 if (osc_recoverable_error(rc)) {
1706 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1707 CERROR("too many resend retries, returning error\n");
1711 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1712 l_wait_event(waitq, 0, &lwi);
1720 int osc_brw_redo_request(struct ptlrpc_request *request,
1721 struct osc_brw_async_args *aa)
1723 struct ptlrpc_request *new_req;
1724 struct ptlrpc_request_set *set = request->rq_set;
1725 struct osc_brw_async_args *new_aa;
1726 struct osc_async_page *oap;
1730 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1731 CERROR("too many resent retries, returning error\n");
1735 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1737 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1738 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1739 aa->aa_cli, aa->aa_oa,
1740 NULL /* lsm unused by osc currently */,
1741 aa->aa_page_count, aa->aa_ppga,
1742 &new_req, aa->aa_ocapa, 0, 1);
1746 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1748 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1749 if (oap->oap_request != NULL) {
1750 LASSERTF(request == oap->oap_request,
1751 "request %p != oap_request %p\n",
1752 request, oap->oap_request);
1753 if (oap->oap_interrupted) {
1754 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1755 ptlrpc_req_finished(new_req);
1760 /* New request takes over pga and oaps from old request.
1761 * Note that copying a list_head doesn't work, need to move it... */
1763 new_req->rq_interpret_reply = request->rq_interpret_reply;
1764 new_req->rq_async_args = request->rq_async_args;
1765 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1767 new_aa = ptlrpc_req_async_args(new_req);
1769 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1770 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1771 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1773 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1774 if (oap->oap_request) {
1775 ptlrpc_req_finished(oap->oap_request);
1776 oap->oap_request = ptlrpc_request_addref(new_req);
1780 new_aa->aa_ocapa = aa->aa_ocapa;
1781 aa->aa_ocapa = NULL;
1783 /* use ptlrpc_set_add_req is safe because interpret functions work
1784 * in check_set context. only one way exist with access to request
1785 * from different thread got -EINTR - this way protected with
1786 * cl_loi_list_lock */
1787 ptlrpc_set_add_req(set, new_req);
1789 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1791 DEBUG_REQ(D_INFO, new_req, "new request");
1796 * ugh, we want disk allocation on the target to happen in offset order. we'll
1797 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1798 * fine for our small page arrays and doesn't require allocation. its an
1799 * insertion sort that swaps elements that are strides apart, shrinking the
1800 * stride down until its '1' and the array is sorted.
1802 static void sort_brw_pages(struct brw_page **array, int num)
1805 struct brw_page *tmp;
1809 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1814 for (i = stride ; i < num ; i++) {
1817 while (j >= stride && array[j - stride]->off > tmp->off) {
1818 array[j] = array[j - stride];
1823 } while (stride > 1);
1826 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1832 LASSERT (pages > 0);
1833 offset = pg[i]->off & ~CFS_PAGE_MASK;
1837 if (pages == 0) /* that's all */
1840 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1841 return count; /* doesn't end on page boundary */
1844 offset = pg[i]->off & ~CFS_PAGE_MASK;
1845 if (offset != 0) /* doesn't start on page boundary */
1852 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1854 struct brw_page **ppga;
1857 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1861 for (i = 0; i < count; i++)
1866 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1868 LASSERT(ppga != NULL);
1869 OBD_FREE(ppga, sizeof(*ppga) * count);
1872 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1873 obd_count page_count, struct brw_page *pga,
1874 struct obd_trans_info *oti)
1876 struct obdo *saved_oa = NULL;
1877 struct brw_page **ppga, **orig;
1878 struct obd_import *imp = class_exp2cliimp(exp);
1879 struct client_obd *cli;
1880 int rc, page_count_orig;
1883 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1884 cli = &imp->imp_obd->u.cli;
1886 if (cmd & OBD_BRW_CHECK) {
1887 /* The caller just wants to know if there's a chance that this
1888 * I/O can succeed */
1890 if (imp->imp_invalid)
1895 /* test_brw with a failed create can trip this, maybe others. */
1896 LASSERT(cli->cl_max_pages_per_rpc);
1900 orig = ppga = osc_build_ppga(pga, page_count);
1903 page_count_orig = page_count;
1905 sort_brw_pages(ppga, page_count);
1906 while (page_count) {
1907 obd_count pages_per_brw;
1909 if (page_count > cli->cl_max_pages_per_rpc)
1910 pages_per_brw = cli->cl_max_pages_per_rpc;
1912 pages_per_brw = page_count;
1914 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1916 if (saved_oa != NULL) {
1917 /* restore previously saved oa */
1918 *oinfo->oi_oa = *saved_oa;
1919 } else if (page_count > pages_per_brw) {
1920 /* save a copy of oa (brw will clobber it) */
1921 OBDO_ALLOC(saved_oa);
1922 if (saved_oa == NULL)
1923 GOTO(out, rc = -ENOMEM);
1924 *saved_oa = *oinfo->oi_oa;
1927 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1928 pages_per_brw, ppga, oinfo->oi_capa);
1933 page_count -= pages_per_brw;
1934 ppga += pages_per_brw;
1938 osc_release_ppga(orig, page_count_orig);
1940 if (saved_oa != NULL)
1941 OBDO_FREE(saved_oa);
1946 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1947 * the dirty accounting. Writeback completes or truncate happens before
1948 * writing starts. Must be called with the loi lock held. */
1949 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1952 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1955 static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
1957 struct osc_async_page *oap;
1960 if (cfs_list_empty(&lop->lop_urgent))
1963 oap = cfs_list_entry(lop->lop_urgent.next,
1964 struct osc_async_page, oap_urgent_item);
1966 if (oap->oap_async_flags & ASYNC_SYNCFS) {
1967 CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
1974 /* This maintains the lists of pending pages to read/write for a given object
1975 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1976 * to quickly find objects that are ready to send an RPC. */
1977 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1983 if (lop->lop_num_pending == 0)
1986 /* if we have an invalid import we want to drain the queued pages
1987 * by forcing them through rpcs that immediately fail and complete
1988 * the pages. recovery relies on this to empty the queued pages
1989 * before canceling the locks and evicting down the llite pages */
1990 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1993 /* stream rpcs in queue order as long as as there is an urgent page
1994 * queued. this is our cheap solution for good batching in the case
1995 * where writepage marks some random page in the middle of the file
1996 * as urgent because of, say, memory pressure */
1997 if (!cfs_list_empty(&lop->lop_urgent)) {
1998 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2001 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2002 optimal = cli->cl_max_pages_per_rpc;
2003 if (cmd & OBD_BRW_WRITE) {
2004 /* trigger a write rpc stream as long as there are dirtiers
2005 * waiting for space. as they're waiting, they're not going to
2006 * create more pages to coalesce with what's waiting.. */
2007 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2008 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2011 /* +16 to avoid triggering rpcs that would want to include pages
2012 * that are being queued but which can't be made ready until
2013 * the queuer finishes with the page. this is a wart for
2014 * llite::commit_write() */
2017 if (lop->lop_num_pending >= optimal)
2023 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2025 struct osc_async_page *oap;
2028 if (cfs_list_empty(&lop->lop_urgent))
2031 oap = cfs_list_entry(lop->lop_urgent.next,
2032 struct osc_async_page, oap_urgent_item);
2034 if (oap->oap_async_flags & ASYNC_HP) {
2035 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2042 static void on_list(cfs_list_t *item, cfs_list_t *list,
2045 if (cfs_list_empty(item) && should_be_on)
2046 cfs_list_add_tail(item, list);
2047 else if (!cfs_list_empty(item) && !should_be_on)
2048 cfs_list_del_init(item);
2051 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2052 * can find pages to build into rpcs quickly */
2053 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2055 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2056 lop_makes_hprpc(&loi->loi_read_lop)) {
2058 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2059 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2061 if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
2062 on_list(&loi->loi_sync_fs_item,
2063 &cli->cl_loi_sync_fs_list,
2064 loi->loi_write_lop.lop_num_pending);
2066 on_list(&loi->loi_hp_ready_item,
2067 &cli->cl_loi_hp_ready_list, 0);
2068 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2069 lop_makes_rpc(cli, &loi->loi_write_lop,
2071 lop_makes_rpc(cli, &loi->loi_read_lop,
2076 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2077 loi->loi_write_lop.lop_num_pending);
2079 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2080 loi->loi_read_lop.lop_num_pending);
2083 static void lop_update_pending(struct client_obd *cli,
2084 struct loi_oap_pages *lop, int cmd, int delta)
2086 lop->lop_num_pending += delta;
2087 if (cmd & OBD_BRW_WRITE)
2088 cli->cl_pending_w_pages += delta;
2090 cli->cl_pending_r_pages += delta;
2094 * this is called when a sync waiter receives an interruption. Its job is to
2095 * get the caller woken as soon as possible. If its page hasn't been put in an
2096 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2097 * desiring interruption which will forcefully complete the rpc once the rpc
2100 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2102 struct loi_oap_pages *lop;
2103 struct lov_oinfo *loi;
2107 LASSERT(!oap->oap_interrupted);
2108 oap->oap_interrupted = 1;
2110 /* ok, it's been put in an rpc. only one oap gets a request reference */
2111 if (oap->oap_request != NULL) {
2112 ptlrpc_mark_interrupted(oap->oap_request);
2113 ptlrpcd_wake(oap->oap_request);
2114 ptlrpc_req_finished(oap->oap_request);
2115 oap->oap_request = NULL;
2119 * page completion may be called only if ->cpo_prep() method was
2120 * executed by osc_io_submit(), that also adds page the to pending list
2122 if (!cfs_list_empty(&oap->oap_pending_item)) {
2123 cfs_list_del_init(&oap->oap_pending_item);
2124 cfs_list_del_init(&oap->oap_urgent_item);
2127 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2128 &loi->loi_write_lop : &loi->loi_read_lop;
2129 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2130 loi_list_maint(oap->oap_cli, oap->oap_loi);
2131 rc = oap->oap_caller_ops->ap_completion(env,
2132 oap->oap_caller_data,
2133 oap->oap_cmd, NULL, -EINTR);
2139 /* this is trying to propogate async writeback errors back up to the
2140 * application. As an async write fails we record the error code for later if
2141 * the app does an fsync. As long as errors persist we force future rpcs to be
2142 * sync so that the app can get a sync error and break the cycle of queueing
2143 * pages for which writeback will fail. */
2144 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2151 ar->ar_force_sync = 1;
2152 ar->ar_min_xid = ptlrpc_sample_next_xid();
2157 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2158 ar->ar_force_sync = 0;
2161 static int osc_add_to_lop_urgent(struct loi_oap_pages *lop,
2162 struct osc_async_page *oap,
2163 obd_flag async_flags)
2166 /* If true, then already present in lop urgent */
2167 if (!cfs_list_empty(&oap->oap_urgent_item)) {
2168 CWARN("Request to add duplicate oap_urgent for flag = %d\n",
2169 oap->oap_async_flags);
2173 /* item from sync_fs, to avoid duplicates check the existing flags */
2174 if (async_flags & ASYNC_SYNCFS) {
2175 cfs_list_add_tail(&oap->oap_urgent_item,
2180 if (oap->oap_async_flags & ASYNC_HP)
2181 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2182 else if (oap->oap_async_flags & ASYNC_URGENT ||
2183 async_flags & ASYNC_URGENT)
2184 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2189 void osc_oap_to_pending(struct osc_async_page *oap)
2191 struct loi_oap_pages *lop;
2193 if (oap->oap_cmd & OBD_BRW_WRITE)
2194 lop = &oap->oap_loi->loi_write_lop;
2196 lop = &oap->oap_loi->loi_read_lop;
2198 osc_add_to_lop_urgent(lop, oap, 0);
2199 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2200 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2203 /* this must be called holding the loi list lock to give coverage to exit_cache,
2204 * async_flag maintenance, and oap_request */
2205 static void osc_ap_completion(const struct lu_env *env,
2206 struct client_obd *cli, struct obdo *oa,
2207 struct osc_async_page *oap, int sent, int rc)
2212 if (oap->oap_request != NULL) {
2213 xid = ptlrpc_req_xid(oap->oap_request);
2214 ptlrpc_req_finished(oap->oap_request);
2215 oap->oap_request = NULL;
2218 cfs_spin_lock(&oap->oap_lock);
2219 oap->oap_async_flags = 0;
2220 cfs_spin_unlock(&oap->oap_lock);
2221 oap->oap_interrupted = 0;
2223 if (oap->oap_cmd & OBD_BRW_WRITE) {
2224 osc_process_ar(&cli->cl_ar, xid, rc);
2225 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2228 if (rc == 0 && oa != NULL) {
2229 if (oa->o_valid & OBD_MD_FLBLOCKS)
2230 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2231 if (oa->o_valid & OBD_MD_FLMTIME)
2232 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2233 if (oa->o_valid & OBD_MD_FLATIME)
2234 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2235 if (oa->o_valid & OBD_MD_FLCTIME)
2236 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2239 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2240 oap->oap_cmd, oa, rc);
2242 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2243 * I/O on the page could start, but OSC calls it under lock
2244 * and thus we can add oap back to pending safely */
2246 /* upper layer wants to leave the page on pending queue */
2247 osc_oap_to_pending(oap);
2249 osc_exit_cache(cli, oap, sent);
2253 static int brw_interpret(const struct lu_env *env,
2254 struct ptlrpc_request *req, void *data, int rc)
2256 struct osc_brw_async_args *aa = data;
2257 struct client_obd *cli;
2261 rc = osc_brw_fini_request(req, rc);
2262 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2263 if (osc_recoverable_error(rc)) {
2264 /* Only retry once for mmaped files since the mmaped page
2265 * might be modified at anytime. We have to retry at least
2266 * once in case there WAS really a corruption of the page
2267 * on the network, that was not caused by mmap() modifying
2268 * the page. Bug11742 */
2269 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2270 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2271 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2274 rc = osc_brw_redo_request(req, aa);
2281 capa_put(aa->aa_ocapa);
2282 aa->aa_ocapa = NULL;
2287 client_obd_list_lock(&cli->cl_loi_list_lock);
2289 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2290 * is called so we know whether to go to sync BRWs or wait for more
2291 * RPCs to complete */
2292 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2293 cli->cl_w_in_flight--;
2295 cli->cl_r_in_flight--;
2297 async = cfs_list_empty(&aa->aa_oaps);
2298 if (!async) { /* from osc_send_oap_rpc() */
2299 struct osc_async_page *oap, *tmp;
2300 /* the caller may re-use the oap after the completion call so
2301 * we need to clean it up a little */
2302 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2304 cfs_list_del_init(&oap->oap_rpc_item);
2305 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2307 OBDO_FREE(aa->aa_oa);
2308 } else { /* from async_internal() */
2310 for (i = 0; i < aa->aa_page_count; i++)
2311 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2313 osc_wake_cache_waiters(cli);
2314 osc_wake_sync_fs(cli);
2315 osc_check_rpcs(env, cli);
2316 client_obd_list_unlock(&cli->cl_loi_list_lock);
2318 cl_req_completion(env, aa->aa_clerq, rc);
2319 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2324 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2325 struct client_obd *cli,
2326 cfs_list_t *rpc_list,
2327 int page_count, int cmd)
2329 struct ptlrpc_request *req;
2330 struct brw_page **pga = NULL;
2331 struct osc_brw_async_args *aa;
2332 struct obdo *oa = NULL;
2333 const struct obd_async_page_ops *ops = NULL;
2334 void *caller_data = NULL;
2335 struct osc_async_page *oap;
2336 struct osc_async_page *tmp;
2337 struct ost_body *body;
2338 struct cl_req *clerq = NULL;
2339 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2340 struct ldlm_lock *lock = NULL;
2341 struct cl_req_attr crattr;
2342 int i, rc, mpflag = 0;
2345 LASSERT(!cfs_list_empty(rpc_list));
2347 if (cmd & OBD_BRW_MEMALLOC)
2348 mpflag = cfs_memory_pressure_get_and_set();
2350 memset(&crattr, 0, sizeof crattr);
2351 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2353 GOTO(out, req = ERR_PTR(-ENOMEM));
2357 GOTO(out, req = ERR_PTR(-ENOMEM));
2360 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2361 struct cl_page *page = osc_oap2cl_page(oap);
2363 ops = oap->oap_caller_ops;
2364 caller_data = oap->oap_caller_data;
2366 clerq = cl_req_alloc(env, page, crt,
2367 1 /* only 1-object rpcs for
2370 GOTO(out, req = (void *)clerq);
2371 lock = oap->oap_ldlm_lock;
2373 pga[i] = &oap->oap_brw_page;
2374 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2375 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2376 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2378 cl_req_page_add(env, clerq, page);
2381 /* always get the data for the obdo for the rpc */
2382 LASSERT(ops != NULL);
2384 crattr.cra_capa = NULL;
2385 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2387 oa->o_handle = lock->l_remote_handle;
2388 oa->o_valid |= OBD_MD_FLHANDLE;
2391 rc = cl_req_prep(env, clerq);
2393 CERROR("cl_req_prep failed: %d\n", rc);
2394 GOTO(out, req = ERR_PTR(rc));
2397 sort_brw_pages(pga, page_count);
2398 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2399 pga, &req, crattr.cra_capa, 1, 0);
2401 CERROR("prep_req failed: %d\n", rc);
2402 GOTO(out, req = ERR_PTR(rc));
2405 if (cmd & OBD_BRW_MEMALLOC)
2406 req->rq_memalloc = 1;
2408 /* Need to update the timestamps after the request is built in case
2409 * we race with setattr (locally or in queue at OST). If OST gets
2410 * later setattr before earlier BRW (as determined by the request xid),
2411 * the OST will not use BRW timestamps. Sadly, there is no obvious
2412 * way to do this in a single call. bug 10150 */
2413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2414 cl_req_attr_set(env, clerq, &crattr,
2415 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2417 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2418 aa = ptlrpc_req_async_args(req);
2419 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2420 cfs_list_splice(rpc_list, &aa->aa_oaps);
2421 CFS_INIT_LIST_HEAD(rpc_list);
2422 aa->aa_clerq = clerq;
2424 if (cmd & OBD_BRW_MEMALLOC)
2425 cfs_memory_pressure_restore(mpflag);
2427 capa_put(crattr.cra_capa);
2432 OBD_FREE(pga, sizeof(*pga) * page_count);
2433 /* this should happen rarely and is pretty bad, it makes the
2434 * pending list not follow the dirty order */
2435 client_obd_list_lock(&cli->cl_loi_list_lock);
2436 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2437 cfs_list_del_init(&oap->oap_rpc_item);
2439 /* queued sync pages can be torn down while the pages
2440 * were between the pending list and the rpc */
2441 if (oap->oap_interrupted) {
2442 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2443 osc_ap_completion(env, cli, NULL, oap, 0,
2447 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2449 if (clerq && !IS_ERR(clerq))
2450 cl_req_completion(env, clerq, PTR_ERR(req));
2456 * prepare pages for ASYNC io and put pages in send queue.
2458 * \param cmd OBD_BRW_* macroses
2459 * \param lop pending pages
2461 * \return zero if no page added to send queue.
2462 * \return 1 if pages successfully added to send queue.
2463 * \return negative on errors.
2466 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2467 struct lov_oinfo *loi,
2468 int cmd, struct loi_oap_pages *lop)
2470 struct ptlrpc_request *req;
2471 obd_count page_count = 0;
2472 struct osc_async_page *oap = NULL, *tmp;
2473 struct osc_brw_async_args *aa;
2474 const struct obd_async_page_ops *ops;
2475 CFS_LIST_HEAD(rpc_list);
2476 CFS_LIST_HEAD(tmp_list);
2477 unsigned int ending_offset;
2478 unsigned starting_offset = 0;
2479 int srvlock = 0, mem_tight = 0;
2480 struct cl_object *clob = NULL;
2483 /* ASYNC_HP pages first. At present, when the lock the pages is
2484 * to be canceled, the pages covered by the lock will be sent out
2485 * with ASYNC_HP. We have to send out them as soon as possible. */
2486 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2487 if (oap->oap_async_flags & ASYNC_HP)
2488 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2490 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2491 if (++page_count >= cli->cl_max_pages_per_rpc)
2495 cfs_list_splice(&tmp_list, &lop->lop_pending);
2498 /* first we find the pages we're allowed to work with */
2499 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2501 ops = oap->oap_caller_ops;
2503 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2504 "magic 0x%x\n", oap, oap->oap_magic);
2507 /* pin object in memory, so that completion call-backs
2508 * can be safely called under client_obd_list lock. */
2509 clob = osc_oap2cl_page(oap)->cp_obj;
2510 cl_object_get(clob);
2513 if (page_count != 0 &&
2514 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2515 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2516 " oap %p, page %p, srvlock %u\n",
2517 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2521 /* If there is a gap at the start of this page, it can't merge
2522 * with any previous page, so we'll hand the network a
2523 * "fragmented" page array that it can't transfer in 1 RDMA */
2524 if (page_count != 0 && oap->oap_page_off != 0)
2527 /* in llite being 'ready' equates to the page being locked
2528 * until completion unlocks it. commit_write submits a page
2529 * as not ready because its unlock will happen unconditionally
2530 * as the call returns. if we race with commit_write giving
2531 * us that page we don't want to create a hole in the page
2532 * stream, so we stop and leave the rpc to be fired by
2533 * another dirtier or kupdated interval (the not ready page
2534 * will still be on the dirty list). we could call in
2535 * at the end of ll_file_write to process the queue again. */
2536 if (!(oap->oap_async_flags & ASYNC_READY)) {
2537 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2540 CDEBUG(D_INODE, "oap %p page %p returned %d "
2541 "instead of ready\n", oap,
2545 /* llite is telling us that the page is still
2546 * in commit_write and that we should try
2547 * and put it in an rpc again later. we
2548 * break out of the loop so we don't create
2549 * a hole in the sequence of pages in the rpc
2554 /* the io isn't needed.. tell the checks
2555 * below to complete the rpc with EINTR */
2556 cfs_spin_lock(&oap->oap_lock);
2557 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2558 cfs_spin_unlock(&oap->oap_lock);
2559 oap->oap_count = -EINTR;
2562 cfs_spin_lock(&oap->oap_lock);
2563 oap->oap_async_flags |= ASYNC_READY;
2564 cfs_spin_unlock(&oap->oap_lock);
2567 LASSERTF(0, "oap %p page %p returned %d "
2568 "from make_ready\n", oap,
2576 * Page submitted for IO has to be locked. Either by
2577 * ->ap_make_ready() or by higher layers.
2579 #if defined(__KERNEL__) && defined(__linux__)
2581 struct cl_page *page;
2583 page = osc_oap2cl_page(oap);
2585 if (page->cp_type == CPT_CACHEABLE &&
2586 !(PageLocked(oap->oap_page) &&
2587 (CheckWriteback(oap->oap_page, cmd)))) {
2588 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2590 (long)oap->oap_page->flags,
2591 oap->oap_async_flags);
2597 /* take the page out of our book-keeping */
2598 cfs_list_del_init(&oap->oap_pending_item);
2599 lop_update_pending(cli, lop, cmd, -1);
2600 cfs_list_del_init(&oap->oap_urgent_item);
2602 if (page_count == 0)
2603 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2604 (PTLRPC_MAX_BRW_SIZE - 1);
2606 /* ask the caller for the size of the io as the rpc leaves. */
2607 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2609 ops->ap_refresh_count(env, oap->oap_caller_data,
2611 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2613 if (oap->oap_count <= 0) {
2614 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2616 osc_ap_completion(env, cli, NULL,
2617 oap, 0, oap->oap_count);
2621 /* now put the page back in our accounting */
2622 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2623 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2625 if (page_count == 0)
2626 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2627 if (++page_count >= cli->cl_max_pages_per_rpc)
2630 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2631 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2632 * have the same alignment as the initial writes that allocated
2633 * extents on the server. */
2634 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2635 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2636 if (ending_offset == 0)
2639 /* If there is a gap at the end of this page, it can't merge
2640 * with any subsequent pages, so we'll hand the network a
2641 * "fragmented" page array that it can't transfer in 1 RDMA */
2642 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2646 osc_wake_cache_waiters(cli);
2647 osc_wake_sync_fs(cli);
2648 loi_list_maint(cli, loi);
2650 client_obd_list_unlock(&cli->cl_loi_list_lock);
2653 cl_object_put(env, clob);
2655 if (page_count == 0) {
2656 client_obd_list_lock(&cli->cl_loi_list_lock);
2660 req = osc_build_req(env, cli, &rpc_list, page_count,
2661 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2663 LASSERT(cfs_list_empty(&rpc_list));
2664 loi_list_maint(cli, loi);
2665 RETURN(PTR_ERR(req));
2668 aa = ptlrpc_req_async_args(req);
2670 if (cmd == OBD_BRW_READ) {
2671 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2672 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2673 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2674 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2676 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2677 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2678 cli->cl_w_in_flight);
2679 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2680 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2682 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2684 client_obd_list_lock(&cli->cl_loi_list_lock);
2686 if (cmd == OBD_BRW_READ)
2687 cli->cl_r_in_flight++;
2689 cli->cl_w_in_flight++;
2691 /* queued sync pages can be torn down while the pages
2692 * were between the pending list and the rpc */
2694 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2695 /* only one oap gets a request reference */
2698 if (oap->oap_interrupted && !req->rq_intr) {
2699 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2701 ptlrpc_mark_interrupted(req);
2705 tmp->oap_request = ptlrpc_request_addref(req);
2707 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2708 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2710 req->rq_interpret_reply = brw_interpret;
2711 ptlrpcd_add_req(req, PSCOPE_BRW);
2715 #define LOI_DEBUG(LOI, STR, args...) \
2716 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2717 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2718 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2719 (LOI)->loi_write_lop.lop_num_pending, \
2720 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2721 (LOI)->loi_read_lop.lop_num_pending, \
2722 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2725 /* This is called by osc_check_rpcs() to find which objects have pages that
2726 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2727 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2731 /* First return objects that have blocked locks so that they
2732 * will be flushed quickly and other clients can get the lock,
2733 * then objects which have pages ready to be stuffed into RPCs */
2734 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2735 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2736 struct lov_oinfo, loi_hp_ready_item));
2737 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2738 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2739 struct lov_oinfo, loi_ready_item));
2740 if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
2741 RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
2742 struct lov_oinfo, loi_sync_fs_item));
2744 /* then if we have cache waiters, return all objects with queued
2745 * writes. This is especially important when many small files
2746 * have filled up the cache and not been fired into rpcs because
2747 * they don't pass the nr_pending/object threshhold */
2748 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2749 !cfs_list_empty(&cli->cl_loi_write_list))
2750 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2751 struct lov_oinfo, loi_write_item));
2753 /* then return all queued objects when we have an invalid import
2754 * so that they get flushed */
2755 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2756 if (!cfs_list_empty(&cli->cl_loi_write_list))
2757 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2760 if (!cfs_list_empty(&cli->cl_loi_read_list))
2761 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2762 struct lov_oinfo, loi_read_item));
2767 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2769 struct osc_async_page *oap;
2772 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2773 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2774 struct osc_async_page, oap_urgent_item);
2775 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2778 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2779 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2780 struct osc_async_page, oap_urgent_item);
2781 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2784 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2787 /* called with the loi list lock held */
2788 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2790 struct lov_oinfo *loi;
2791 int rc = 0, race_counter = 0;
2794 while ((loi = osc_next_loi(cli)) != NULL) {
2795 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2797 if (osc_max_rpc_in_flight(cli, loi))
2800 /* attempt some read/write balancing by alternating between
2801 * reads and writes in an object. The makes_rpc checks here
2802 * would be redundant if we were getting read/write work items
2803 * instead of objects. we don't want send_oap_rpc to drain a
2804 * partial read pending queue when we're given this object to
2805 * do io on writes while there are cache waiters */
2806 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2807 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2808 &loi->loi_write_lop);
2810 CERROR("Write request failed with %d\n", rc);
2812 /* osc_send_oap_rpc failed, mostly because of
2815 * It can't break here, because if:
2816 * - a page was submitted by osc_io_submit, so
2818 * - no request in flight
2819 * - no subsequent request
2820 * The system will be in live-lock state,
2821 * because there is no chance to call
2822 * osc_io_unplug() and osc_check_rpcs() any
2823 * more. pdflush can't help in this case,
2824 * because it might be blocked at grabbing
2825 * the page lock as we mentioned.
2827 * Anyway, continue to drain pages. */
2836 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2837 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2838 &loi->loi_read_lop);
2840 CERROR("Read request failed with %d\n", rc);
2848 /* attempt some inter-object balancing by issuing rpcs
2849 * for each object in turn */
2850 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2851 cfs_list_del_init(&loi->loi_hp_ready_item);
2852 if (!cfs_list_empty(&loi->loi_ready_item))
2853 cfs_list_del_init(&loi->loi_ready_item);
2854 if (!cfs_list_empty(&loi->loi_write_item))
2855 cfs_list_del_init(&loi->loi_write_item);
2856 if (!cfs_list_empty(&loi->loi_read_item))
2857 cfs_list_del_init(&loi->loi_read_item);
2858 if (!cfs_list_empty(&loi->loi_sync_fs_item))
2859 cfs_list_del_init(&loi->loi_sync_fs_item);
2861 loi_list_maint(cli, loi);
2863 /* send_oap_rpc fails with 0 when make_ready tells it to
2864 * back off. llite's make_ready does this when it tries
2865 * to lock a page queued for write that is already locked.
2866 * we want to try sending rpcs from many objects, but we
2867 * don't want to spin failing with 0. */
2868 if (race_counter == 10)
2874 /* we're trying to queue a page in the osc so we're subject to the
2875 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2876 * If the osc's queued pages are already at that limit, then we want to sleep
2877 * until there is space in the osc's queue for us. We also may be waiting for
2878 * write credits from the OST if there are RPCs in flight that may return some
2879 * before we fall back to sync writes.
2881 * We need this know our allocation was granted in the presence of signals */
2882 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2886 client_obd_list_lock(&cli->cl_loi_list_lock);
2887 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2888 client_obd_list_unlock(&cli->cl_loi_list_lock);
2893 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2896 int osc_enter_cache_try(const struct lu_env *env,
2897 struct client_obd *cli, struct lov_oinfo *loi,
2898 struct osc_async_page *oap, int transient)
2902 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2904 osc_consume_write_grant(cli, &oap->oap_brw_page);
2906 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2907 cfs_atomic_inc(&obd_dirty_transit_pages);
2908 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2914 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2915 * grant or cache space. */
2916 static int osc_enter_cache(const struct lu_env *env,
2917 struct client_obd *cli, struct lov_oinfo *loi,
2918 struct osc_async_page *oap)
2920 struct osc_cache_waiter ocw;
2921 struct l_wait_info lwi = { 0 };
2925 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2926 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2927 cli->cl_dirty_max, obd_max_dirty_pages,
2928 cli->cl_lost_grant, cli->cl_avail_grant);
2930 /* force the caller to try sync io. this can jump the list
2931 * of queued writes and create a discontiguous rpc stream */
2932 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2933 loi->loi_ar.ar_force_sync)
2936 /* Hopefully normal case - cache space and write credits available */
2937 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2938 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2939 osc_enter_cache_try(env, cli, loi, oap, 0))
2942 /* It is safe to block as a cache waiter as long as there is grant
2943 * space available or the hope of additional grant being returned
2944 * when an in flight write completes. Using the write back cache
2945 * if possible is preferable to sending the data synchronously
2946 * because write pages can then be merged in to large requests.
2947 * The addition of this cache waiter will causing pending write
2948 * pages to be sent immediately. */
2949 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2950 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2951 cfs_waitq_init(&ocw.ocw_waitq);
2955 loi_list_maint(cli, loi);
2956 osc_check_rpcs(env, cli);
2957 client_obd_list_unlock(&cli->cl_loi_list_lock);
2959 CDEBUG(D_CACHE, "sleeping for cache space\n");
2960 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2962 client_obd_list_lock(&cli->cl_loi_list_lock);
2963 if (!cfs_list_empty(&ocw.ocw_entry)) {
2964 cfs_list_del(&ocw.ocw_entry);
2974 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2975 struct lov_oinfo *loi, cfs_page_t *page,
2976 obd_off offset, const struct obd_async_page_ops *ops,
2977 void *data, void **res, int nocache,
2978 struct lustre_handle *lockh)
2980 struct osc_async_page *oap;
2985 return cfs_size_round(sizeof(*oap));
2988 oap->oap_magic = OAP_MAGIC;
2989 oap->oap_cli = &exp->exp_obd->u.cli;
2992 oap->oap_caller_ops = ops;
2993 oap->oap_caller_data = data;
2995 oap->oap_page = page;
2996 oap->oap_obj_off = offset;
2997 if (!client_is_remote(exp) &&
2998 cfs_capable(CFS_CAP_SYS_RESOURCE))
2999 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
3001 LASSERT(!(offset & ~CFS_PAGE_MASK));
3003 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
3004 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
3005 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
3006 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
3008 cfs_spin_lock_init(&oap->oap_lock);
3009 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
3013 struct osc_async_page *oap_from_cookie(void *cookie)
3015 struct osc_async_page *oap = cookie;
3016 if (oap->oap_magic != OAP_MAGIC)
3017 return ERR_PTR(-EINVAL);
3021 int osc_queue_async_io(const struct lu_env *env,
3022 struct obd_export *exp, struct lov_stripe_md *lsm,
3023 struct lov_oinfo *loi, void *cookie,
3024 int cmd, obd_off off, int count,
3025 obd_flag brw_flags, enum async_flags async_flags)
3027 struct client_obd *cli = &exp->exp_obd->u.cli;
3028 struct osc_async_page *oap;
3032 oap = oap_from_cookie(cookie);
3034 RETURN(PTR_ERR(oap));
3036 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3039 if (!cfs_list_empty(&oap->oap_pending_item) ||
3040 !cfs_list_empty(&oap->oap_urgent_item) ||
3041 !cfs_list_empty(&oap->oap_rpc_item))
3044 /* check if the file's owner/group is over quota */
3045 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3046 struct cl_object *obj;
3047 struct cl_attr attr; /* XXX put attr into thread info */
3048 unsigned int qid[MAXQUOTAS];
3050 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3052 cl_object_attr_lock(obj);
3053 rc = cl_object_attr_get(env, obj, &attr);
3054 cl_object_attr_unlock(obj);
3056 qid[USRQUOTA] = attr.cat_uid;
3057 qid[GRPQUOTA] = attr.cat_gid;
3059 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3066 loi = lsm->lsm_oinfo[0];
3068 client_obd_list_lock(&cli->cl_loi_list_lock);
3070 LASSERT(off + count <= CFS_PAGE_SIZE);
3072 oap->oap_page_off = off;
3073 oap->oap_count = count;
3074 oap->oap_brw_flags = brw_flags;
3075 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3076 if (cfs_memory_pressure_get())
3077 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3078 cfs_spin_lock(&oap->oap_lock);
3079 oap->oap_async_flags = async_flags;
3080 cfs_spin_unlock(&oap->oap_lock);
3082 if (cmd & OBD_BRW_WRITE) {
3083 rc = osc_enter_cache(env, cli, loi, oap);
3085 client_obd_list_unlock(&cli->cl_loi_list_lock);
3090 osc_oap_to_pending(oap);
3091 loi_list_maint(cli, loi);
3093 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3096 osc_check_rpcs(env, cli);
3097 client_obd_list_unlock(&cli->cl_loi_list_lock);
3102 /* aka (~was & now & flag), but this is more clear :) */
3103 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3105 int osc_set_async_flags_base(struct client_obd *cli,
3106 struct lov_oinfo *loi, struct osc_async_page *oap,
3107 obd_flag async_flags)
3109 struct loi_oap_pages *lop;
3113 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3115 if (oap->oap_cmd & OBD_BRW_WRITE) {
3116 lop = &loi->loi_write_lop;
3118 lop = &loi->loi_read_lop;
3121 if ((oap->oap_async_flags & async_flags) == async_flags)
3124 /* XXX: This introduces a tiny insignificant race for the case if this
3125 * loi already had other urgent items.
3127 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
3128 cfs_list_empty(&oap->oap_rpc_item) &&
3129 cfs_list_empty(&oap->oap_urgent_item)) {
3130 osc_add_to_lop_urgent(lop, oap, ASYNC_SYNCFS);
3131 flags |= ASYNC_SYNCFS;
3132 cfs_spin_lock(&oap->oap_lock);
3133 oap->oap_async_flags |= flags;
3134 cfs_spin_unlock(&oap->oap_lock);
3135 loi_list_maint(cli, loi);
3139 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3140 flags |= ASYNC_READY;
3142 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3143 cfs_list_empty(&oap->oap_rpc_item)) {
3144 osc_add_to_lop_urgent(lop, oap, ASYNC_URGENT);
3145 flags |= ASYNC_URGENT;
3146 loi_list_maint(cli, loi);
3148 cfs_spin_lock(&oap->oap_lock);
3149 oap->oap_async_flags |= flags;
3150 cfs_spin_unlock(&oap->oap_lock);
3152 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3153 oap->oap_async_flags);
3157 int osc_teardown_async_page(struct obd_export *exp,
3158 struct lov_stripe_md *lsm,
3159 struct lov_oinfo *loi, void *cookie)
3161 struct client_obd *cli = &exp->exp_obd->u.cli;