1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_async_args *aa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
363 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 struct ptlrpc_request_set *rqset)
371 struct ptlrpc_request *req;
372 struct osc_async_args *aa;
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
380 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383 ptlrpc_request_free(req);
387 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390 osc_pack_req_body(req, oinfo);
392 ptlrpc_request_set_replen(req);
394 /* do mds to ost setattr asynchronously */
396 /* Do not wait for response. */
397 ptlrpcd_add_req(req, PSCOPE_OTHER);
399 req->rq_interpret_reply =
400 (ptlrpc_interpterer_t)osc_setattr_interpret;
402 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
403 aa = ptlrpc_req_async_args(req);
406 ptlrpc_set_add_req(rqset, req);
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413 struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 struct ptlrpc_request *req;
416 struct ost_body *body;
417 struct lov_stripe_md *lsm;
426 rc = obd_alloc_memmd(exp, &lsm);
431 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433 GOTO(out, rc = -ENOMEM);
435 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437 ptlrpc_request_free(req);
441 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443 lustre_set_wire_obdo(&body->oa, oa);
445 ptlrpc_request_set_replen(req);
447 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448 oa->o_flags == OBD_FL_DELORPHAN) {
450 "delorphan from OST integration");
451 /* Don't resend the delorphan req */
452 req->rq_no_resend = req->rq_no_delay = 1;
455 rc = ptlrpc_queue_wait(req);
459 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461 GOTO(out_req, rc = -EPROTO);
463 lustre_get_wire_obdo(oa, &body->oa);
465 /* This should really be sent by the OST */
466 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
467 oa->o_valid |= OBD_MD_FLBLKSZ;
469 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470 * have valid lsm_oinfo data structs, so don't go touching that.
471 * This needs to be fixed in a big way.
473 lsm->lsm_object_id = oa->o_id;
474 lsm->lsm_object_gr = oa->o_gr;
478 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
480 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481 if (!oti->oti_logcookies)
482 oti_alloc_cookies(oti, 1);
483 *oti->oti_logcookies = oa->o_lcookie;
487 CDEBUG(D_HA, "transno: "LPD64"\n",
488 lustre_msg_get_transno(req->rq_repmsg));
490 ptlrpc_req_finished(req);
493 obd_free_memmd(exp, &lsm);
497 static int osc_punch_interpret(const struct lu_env *env,
498 struct ptlrpc_request *req,
499 struct osc_punch_args *aa, int rc)
501 struct ost_body *body;
507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509 GOTO(out, rc = -EPROTO);
511 lustre_get_wire_obdo(aa->pa_oa, &body->oa);
513 rc = aa->pa_upcall(aa->pa_cookie, rc);
517 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
518 struct obd_capa *capa,
519 obd_enqueue_update_f upcall, void *cookie,
520 struct ptlrpc_request_set *rqset)
522 struct ptlrpc_request *req;
523 struct osc_punch_args *aa;
524 struct ost_body *body;
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 osc_set_capa_size(req, &RMF_CAPA1, capa);
533 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535 ptlrpc_request_free(req);
538 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req);
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 lustre_set_wire_obdo(&body->oa, oa);
544 osc_pack_capa(req, body, capa);
546 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
550 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551 aa = ptlrpc_req_async_args(req);
553 aa->pa_upcall = upcall;
554 aa->pa_cookie = cookie;
555 if (rqset == PTLRPCD_SET)
556 ptlrpcd_add_req(req, PSCOPE_OTHER);
558 ptlrpc_set_add_req(rqset, req);
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564 struct obd_trans_info *oti,
565 struct ptlrpc_request_set *rqset)
567 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
568 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
571 oinfo->oi_cb_up, oinfo, rqset);
574 static int osc_sync(struct obd_export *exp, struct obdo *oa,
575 struct lov_stripe_md *md, obd_size start, obd_size end,
578 struct ptlrpc_request *req;
579 struct ost_body *body;
584 CDEBUG(D_INFO, "oa NULL\n");
588 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
592 osc_set_capa_size(req, &RMF_CAPA1, capa);
593 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
595 ptlrpc_request_free(req);
599 /* overload the size and blocks fields in the oa with start/end */
600 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
602 lustre_set_wire_obdo(&body->oa, oa);
603 body->oa.o_size = start;
604 body->oa.o_blocks = end;
605 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
606 osc_pack_capa(req, body, capa);
608 ptlrpc_request_set_replen(req);
610 rc = ptlrpc_queue_wait(req);
614 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
616 GOTO(out, rc = -EPROTO);
618 lustre_get_wire_obdo(oa, &body->oa);
622 ptlrpc_req_finished(req);
626 /* Find and cancel locally locks matched by @mode in the resource found by
627 * @objid. Found locks are added into @cancel list. Returns the amount of
628 * locks added to @cancels list. */
629 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
631 ldlm_mode_t mode, int lock_flags)
633 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
634 struct ldlm_res_id res_id;
635 struct ldlm_resource *res;
639 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
640 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
644 LDLM_RESOURCE_ADDREF(res);
645 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
646 lock_flags, 0, NULL);
647 LDLM_RESOURCE_DELREF(res);
648 ldlm_resource_putref(res);
652 static int osc_destroy_interpret(const struct lu_env *env,
653 struct ptlrpc_request *req, void *data,
656 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
658 cfs_atomic_dec(&cli->cl_destroy_in_flight);
659 cfs_waitq_signal(&cli->cl_destroy_waitq);
663 static int osc_can_send_destroy(struct client_obd *cli)
665 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
666 cli->cl_max_rpcs_in_flight) {
667 /* The destroy request can be sent */
670 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
671 cli->cl_max_rpcs_in_flight) {
673 * The counter has been modified between the two atomic
676 cfs_waitq_signal(&cli->cl_destroy_waitq);
681 /* Destroy requests can be async always on the client, and we don't even really
682 * care about the return code since the client cannot do anything at all about
684 * When the MDS is unlinking a filename, it saves the file objects into a
685 * recovery llog, and these object records are cancelled when the OST reports
686 * they were destroyed and sync'd to disk (i.e. transaction committed).
687 * If the client dies, or the OST is down when the object should be destroyed,
688 * the records are not cancelled, and when the OST reconnects to the MDS next,
689 * it will retrieve the llog unlink logs and then sends the log cancellation
690 * cookies to the MDS after committing destroy transactions. */
691 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
692 struct lov_stripe_md *ea, struct obd_trans_info *oti,
693 struct obd_export *md_export, void *capa)
695 struct client_obd *cli = &exp->exp_obd->u.cli;
696 struct ptlrpc_request *req;
697 struct ost_body *body;
698 CFS_LIST_HEAD(cancels);
703 CDEBUG(D_INFO, "oa NULL\n");
707 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
708 LDLM_FL_DISCARD_DATA);
710 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
712 ldlm_lock_list_put(&cancels, l_bl_ast, count);
716 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
717 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
720 ptlrpc_request_free(req);
724 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
725 ptlrpc_at_set_req_timeout(req);
727 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
728 oa->o_lcookie = *oti->oti_logcookies;
729 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
731 lustre_set_wire_obdo(&body->oa, oa);
733 osc_pack_capa(req, body, (struct obd_capa *)capa);
734 ptlrpc_request_set_replen(req);
736 /* don't throttle destroy RPCs for the MDT */
737 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
738 req->rq_interpret_reply = osc_destroy_interpret;
739 if (!osc_can_send_destroy(cli)) {
740 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
744 * Wait until the number of on-going destroy RPCs drops
745 * under max_rpc_in_flight
747 l_wait_event_exclusive(cli->cl_destroy_waitq,
748 osc_can_send_destroy(cli), &lwi);
752 /* Do not wait for response */
753 ptlrpcd_add_req(req, PSCOPE_OTHER);
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
760 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
762 LASSERT(!(oa->o_valid & bits));
765 client_obd_list_lock(&cli->cl_loi_list_lock);
766 oa->o_dirty = cli->cl_dirty;
767 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
771 } else if (cfs_atomic_read(&obd_dirty_pages) -
772 cfs_atomic_read(&obd_dirty_transit_pages) >
773 obd_max_dirty_pages + 1){
774 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
775 * not covered by a lock thus they may safely race and trip
776 * this CERROR() unless we add in a small fudge factor (+1). */
777 CERROR("dirty %d - %d > system dirty_max %d\n",
778 cfs_atomic_read(&obd_dirty_pages),
779 cfs_atomic_read(&obd_dirty_transit_pages),
780 obd_max_dirty_pages);
782 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
783 CERROR("dirty %lu - dirty_max %lu too big???\n",
784 cli->cl_dirty, cli->cl_dirty_max);
787 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
788 (cli->cl_max_rpcs_in_flight + 1);
789 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791 oa->o_grant = cli->cl_avail_grant;
792 oa->o_dropped = cli->cl_lost_grant;
793 cli->cl_lost_grant = 0;
794 client_obd_list_unlock(&cli->cl_loi_list_lock);
795 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
796 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
800 static void osc_update_next_shrink(struct client_obd *cli)
802 cli->cl_next_shrink_grant =
803 cfs_time_shift(cli->cl_grant_shrink_interval);
804 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
805 cli->cl_next_shrink_grant);
808 /* caller must hold loi_list_lock */
809 static void osc_consume_write_grant(struct client_obd *cli,
810 struct brw_page *pga)
812 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
813 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814 cfs_atomic_inc(&obd_dirty_pages);
815 cli->cl_dirty += CFS_PAGE_SIZE;
816 cli->cl_avail_grant -= CFS_PAGE_SIZE;
817 pga->flag |= OBD_BRW_FROM_GRANT;
818 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819 CFS_PAGE_SIZE, pga, pga->pg);
820 LASSERT(cli->cl_avail_grant >= 0);
821 osc_update_next_shrink(cli);
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825 * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827 struct brw_page *pga, int sent)
829 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
833 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
838 pga->flag &= ~OBD_BRW_FROM_GRANT;
839 cfs_atomic_dec(&obd_dirty_pages);
840 cli->cl_dirty -= CFS_PAGE_SIZE;
841 if (pga->flag & OBD_BRW_NOCACHE) {
842 pga->flag &= ~OBD_BRW_NOCACHE;
843 cfs_atomic_dec(&obd_dirty_transit_pages);
844 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
847 cli->cl_lost_grant += CFS_PAGE_SIZE;
848 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
849 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
850 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
851 /* For short writes we shouldn't count parts of pages that
852 * span a whole block on the OST side, or our accounting goes
853 * wrong. Should match the code in filter_grant_check. */
854 int offset = pga->off & ~CFS_PAGE_MASK;
855 int count = pga->count + (offset & (blocksize - 1));
856 int end = (offset + pga->count) & (blocksize - 1);
858 count += blocksize - end;
860 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
861 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
862 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
863 cli->cl_avail_grant, cli->cl_dirty);
869 static unsigned long rpcs_in_flight(struct client_obd *cli)
871 return cli->cl_r_in_flight + cli->cl_w_in_flight;
874 /* caller must hold loi_list_lock */
875 void osc_wake_cache_waiters(struct client_obd *cli)
878 struct osc_cache_waiter *ocw;
881 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
882 /* if we can't dirty more, we must wait until some is written */
883 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
884 (cfs_atomic_read(&obd_dirty_pages) + 1 >
885 obd_max_dirty_pages)) {
886 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887 "osc max %ld, sys max %d\n", cli->cl_dirty,
888 cli->cl_dirty_max, obd_max_dirty_pages);
892 /* if still dirty cache but no grant wait for pending RPCs that
893 * may yet return us some grant before doing sync writes */
894 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896 cli->cl_w_in_flight);
900 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
901 cfs_list_del_init(&ocw->ocw_entry);
902 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903 /* no more RPCs in flight to return grant, do sync IO */
904 ocw->ocw_rc = -EDQUOT;
905 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
907 osc_consume_write_grant(cli,
908 &ocw->ocw_oap->oap_brw_page);
911 cfs_waitq_signal(&ocw->ocw_waitq);
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
919 client_obd_list_lock(&cli->cl_loi_list_lock);
920 cli->cl_avail_grant += grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
926 if (body->oa.o_valid & OBD_MD_FLGRANT) {
927 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928 __osc_update_grant(cli, body->oa.o_grant);
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933 void *key, obd_count vallen, void *val,
934 struct ptlrpc_request_set *set);
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937 struct ptlrpc_request *req,
940 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942 struct ost_body *body;
945 __osc_update_grant(cli, oa->o_grant);
949 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
951 osc_update_grant(cli, body);
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
959 client_obd_list_lock(&cli->cl_loi_list_lock);
960 oa->o_grant = cli->cl_avail_grant / 4;
961 cli->cl_avail_grant -= oa->o_grant;
962 client_obd_list_unlock(&cli->cl_loi_list_lock);
963 oa->o_flags |= OBD_FL_SHRINK_GRANT;
964 osc_update_next_shrink(cli);
967 /* Shrink the current grant, either from some large amount to enough for a
968 * full set of in-flight RPCs, or if we have already shrunk to that limit
969 * then to enough for a single RPC. This avoids keeping more grant than
970 * needed, and avoids shrinking the grant piecemeal. */
971 static int osc_shrink_grant(struct client_obd *cli)
973 long target = (cli->cl_max_rpcs_in_flight + 1) *
974 cli->cl_max_pages_per_rpc;
976 client_obd_list_lock(&cli->cl_loi_list_lock);
977 if (cli->cl_avail_grant <= target)
978 target = cli->cl_max_pages_per_rpc;
979 client_obd_list_unlock(&cli->cl_loi_list_lock);
981 return osc_shrink_grant_to_target(cli, target);
984 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
987 struct ost_body *body;
990 client_obd_list_lock(&cli->cl_loi_list_lock);
991 /* Don't shrink if we are already above or below the desired limit
992 * We don't want to shrink below a single RPC, as that will negatively
993 * impact block allocation and long-term performance. */
994 if (target < cli->cl_max_pages_per_rpc)
995 target = cli->cl_max_pages_per_rpc;
997 if (target >= cli->cl_avail_grant) {
998 client_obd_list_unlock(&cli->cl_loi_list_lock);
1001 client_obd_list_unlock(&cli->cl_loi_list_lock);
1003 OBD_ALLOC_PTR(body);
1007 osc_announce_cached(cli, &body->oa, 0);
1009 client_obd_list_lock(&cli->cl_loi_list_lock);
1010 body->oa.o_grant = cli->cl_avail_grant - target;
1011 cli->cl_avail_grant = target;
1012 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1014 osc_update_next_shrink(cli);
1016 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1017 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1018 sizeof(*body), body, NULL);
1020 __osc_update_grant(cli, body->oa.o_grant);
1025 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1026 static int osc_should_shrink_grant(struct client_obd *client)
1028 cfs_time_t time = cfs_time_current();
1029 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1030 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1031 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1032 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1035 osc_update_next_shrink(client);
1040 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1042 struct client_obd *client;
1044 cfs_list_for_each_entry(client, &item->ti_obd_list,
1045 cl_grant_shrink_list) {
1046 if (osc_should_shrink_grant(client))
1047 osc_shrink_grant(client);
1052 static int osc_add_shrink_grant(struct client_obd *client)
1056 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1058 osc_grant_shrink_grant_cb, NULL,
1059 &client->cl_grant_shrink_list);
1061 CERROR("add grant client %s error %d\n",
1062 client->cl_import->imp_obd->obd_name, rc);
1065 CDEBUG(D_CACHE, "add grant client %s \n",
1066 client->cl_import->imp_obd->obd_name);
1067 osc_update_next_shrink(client);
1071 static int osc_del_shrink_grant(struct client_obd *client)
1073 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1080 * ocd_grant is the total grant amount we're expect to hold: if we've
1081 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1082 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1084 * race is tolerable here: if we're evicted, but imp_state already
1085 * left EVICTED state, then cl_dirty must be 0 already.
1087 client_obd_list_lock(&cli->cl_loi_list_lock);
1088 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1089 cli->cl_avail_grant = ocd->ocd_grant;
1091 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1092 client_obd_list_unlock(&cli->cl_loi_list_lock);
1094 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1095 cli->cl_avail_grant, cli->cl_lost_grant);
1096 LASSERT(cli->cl_avail_grant >= 0);
1098 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1099 cfs_list_empty(&cli->cl_grant_shrink_list))
1100 osc_add_shrink_grant(cli);
1103 /* We assume that the reason this OSC got a short read is because it read
1104 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1105 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1106 * this stripe never got written at or beyond this stripe offset yet. */
1107 static void handle_short_read(int nob_read, obd_count page_count,
1108 struct brw_page **pga)
1113 /* skip bytes read OK */
1114 while (nob_read > 0) {
1115 LASSERT (page_count > 0);
1117 if (pga[i]->count > nob_read) {
1118 /* EOF inside this page */
1119 ptr = cfs_kmap(pga[i]->pg) +
1120 (pga[i]->off & ~CFS_PAGE_MASK);
1121 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1122 cfs_kunmap(pga[i]->pg);
1128 nob_read -= pga[i]->count;
1133 /* zero remaining pages */
1134 while (page_count-- > 0) {
1135 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1136 memset(ptr, 0, pga[i]->count);
1137 cfs_kunmap(pga[i]->pg);
1142 static int check_write_rcs(struct ptlrpc_request *req,
1143 int requested_nob, int niocount,
1144 obd_count page_count, struct brw_page **pga)
1149 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1150 sizeof(*remote_rcs) *
1152 if (remote_rcs == NULL) {
1153 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1157 /* return error if any niobuf was in error */
1158 for (i = 0; i < niocount; i++) {
1159 if (remote_rcs[i] < 0)
1160 return(remote_rcs[i]);
1162 if (remote_rcs[i] != 0) {
1163 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1164 i, remote_rcs[i], req);
1169 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1170 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1171 req->rq_bulk->bd_nob_transferred, requested_nob);
1178 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1180 if (p1->flag != p2->flag) {
1181 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1182 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1184 /* warn if we try to combine flags that we don't know to be
1185 * safe to combine */
1186 if ((p1->flag & mask) != (p2->flag & mask))
1187 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1188 "same brw?\n", p1->flag, p2->flag);
1192 return (p1->off + p1->count == p2->off);
1195 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1196 struct brw_page **pga, int opc,
1197 cksum_type_t cksum_type)
1202 LASSERT (pg_count > 0);
1203 cksum = init_checksum(cksum_type);
1204 while (nob > 0 && pg_count > 0) {
1205 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1206 int off = pga[i]->off & ~CFS_PAGE_MASK;
1207 int count = pga[i]->count > nob ? nob : pga[i]->count;
1209 /* corrupt the data before we compute the checksum, to
1210 * simulate an OST->client data error */
1211 if (i == 0 && opc == OST_READ &&
1212 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1213 memcpy(ptr + off, "bad1", min(4, nob));
1214 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1215 cfs_kunmap(pga[i]->pg);
1216 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1219 nob -= pga[i]->count;
1223 /* For sending we only compute the wrong checksum instead
1224 * of corrupting the data so it is still correct on a redo */
1225 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1232 struct lov_stripe_md *lsm, obd_count page_count,
1233 struct brw_page **pga,
1234 struct ptlrpc_request **reqp,
1235 struct obd_capa *ocapa, int reserve)
1237 struct ptlrpc_request *req;
1238 struct ptlrpc_bulk_desc *desc;
1239 struct ost_body *body;
1240 struct obd_ioobj *ioobj;
1241 struct niobuf_remote *niobuf;
1242 int niocount, i, requested_nob, opc, rc;
1243 struct osc_brw_async_args *aa;
1244 struct req_capsule *pill;
1245 struct brw_page *pg_prev;
1248 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1249 RETURN(-ENOMEM); /* Recoverable */
1250 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1251 RETURN(-EINVAL); /* Fatal */
1253 if ((cmd & OBD_BRW_WRITE) != 0) {
1255 req = ptlrpc_request_alloc_pool(cli->cl_import,
1256 cli->cl_import->imp_rq_pool,
1257 &RQF_OST_BRW_WRITE);
1260 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1265 for (niocount = i = 1; i < page_count; i++) {
1266 if (!can_merge_pages(pga[i - 1], pga[i]))
1270 pill = &req->rq_pill;
1271 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1273 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1274 niocount * sizeof(*niobuf));
1275 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1279 ptlrpc_request_free(req);
1282 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1283 ptlrpc_at_set_req_timeout(req);
1285 if (opc == OST_WRITE)
1286 desc = ptlrpc_prep_bulk_imp(req, page_count,
1287 BULK_GET_SOURCE, OST_BULK_PORTAL);
1289 desc = ptlrpc_prep_bulk_imp(req, page_count,
1290 BULK_PUT_SINK, OST_BULK_PORTAL);
1293 GOTO(out, rc = -ENOMEM);
1294 /* NB request now owns desc and will free it when it gets freed */
1296 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1297 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1298 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1299 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1301 lustre_set_wire_obdo(&body->oa, oa);
1303 obdo_to_ioobj(oa, ioobj);
1304 ioobj->ioo_bufcnt = niocount;
1305 osc_pack_capa(req, body, ocapa);
1306 LASSERT (page_count > 0);
1308 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309 struct brw_page *pg = pga[i];
1311 LASSERT(pg->count > 0);
1312 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1313 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1314 pg->off, pg->count);
1316 LASSERTF(i == 0 || pg->off > pg_prev->off,
1317 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1318 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1320 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1321 pg_prev->pg, page_private(pg_prev->pg),
1322 pg_prev->pg->index, pg_prev->off);
1324 LASSERTF(i == 0 || pg->off > pg_prev->off,
1325 "i %d p_c %u\n", i, page_count);
1327 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1328 (pg->flag & OBD_BRW_SRVLOCK));
1330 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1332 requested_nob += pg->count;
1334 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1336 niobuf->len += pg->count;
1338 niobuf->offset = pg->off;
1339 niobuf->len = pg->count;
1340 niobuf->flags = pg->flag;
1345 LASSERTF((void *)(niobuf - niocount) ==
1346 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1350 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1351 if (osc_should_shrink_grant(cli))
1352 osc_shrink_grant_local(cli, &body->oa);
1354 /* size[REQ_REC_OFF] still sizeof (*body) */
1355 if (opc == OST_WRITE) {
1356 if (unlikely(cli->cl_checksum) &&
1357 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1358 /* store cl_cksum_type in a local variable since
1359 * it can be changed via lprocfs */
1360 cksum_type_t cksum_type = cli->cl_cksum_type;
1362 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1363 oa->o_flags &= OBD_FL_LOCAL_MASK;
1364 body->oa.o_flags = 0;
1366 body->oa.o_flags |= cksum_type_pack(cksum_type);
1367 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1368 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1372 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1374 /* save this in 'oa', too, for later checking */
1375 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376 oa->o_flags |= cksum_type_pack(cksum_type);
1378 /* clear out the checksum flag, in case this is a
1379 * resend but cl_checksum is no longer set. b=11238 */
1380 oa->o_valid &= ~OBD_MD_FLCKSUM;
1382 oa->o_cksum = body->oa.o_cksum;
1383 /* 1 RC per niobuf */
1384 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1385 sizeof(__u32) * niocount);
1387 if (unlikely(cli->cl_checksum) &&
1388 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1390 body->oa.o_flags = 0;
1391 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1392 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395 ptlrpc_request_set_replen(req);
1397 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1398 aa = ptlrpc_req_async_args(req);
1400 aa->aa_requested_nob = requested_nob;
1401 aa->aa_nio_count = niocount;
1402 aa->aa_page_count = page_count;
1406 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1407 if (ocapa && reserve)
1408 aa->aa_ocapa = capa_get(ocapa);
1414 ptlrpc_req_finished(req);
1418 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1419 __u32 client_cksum, __u32 server_cksum, int nob,
1420 obd_count page_count, struct brw_page **pga,
1421 cksum_type_t client_cksum_type)
1425 cksum_type_t cksum_type;
1427 if (server_cksum == client_cksum) {
1428 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1432 if (oa->o_valid & OBD_MD_FLFLAGS)
1433 cksum_type = cksum_type_unpack(oa->o_flags);
1435 cksum_type = OBD_CKSUM_CRC32;
1437 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440 if (cksum_type != client_cksum_type)
1441 msg = "the server did not use the checksum type specified in "
1442 "the original request - likely a protocol problem";
1443 else if (new_cksum == server_cksum)
1444 msg = "changed on the client after we checksummed it - "
1445 "likely false positive due to mmap IO (bug 11742)";
1446 else if (new_cksum == client_cksum)
1447 msg = "changed in transit before arrival at OST";
1449 msg = "changed in transit AND doesn't match the original - "
1450 "likely false positive due to mmap IO (bug 11742)";
1452 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1453 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1454 "["LPU64"-"LPU64"]\n",
1455 msg, libcfs_nid2str(peer->nid),
1456 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1457 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1460 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1462 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1463 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1464 "client csum now %x\n", client_cksum, client_cksum_type,
1465 server_cksum, cksum_type, new_cksum);
1469 /* Note rc enters this function as number of bytes transferred */
1470 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1472 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1473 const lnet_process_id_t *peer =
1474 &req->rq_import->imp_connection->c_peer;
1475 struct client_obd *cli = aa->aa_cli;
1476 struct ost_body *body;
1477 __u32 client_cksum = 0;
1480 if (rc < 0 && rc != -EDQUOT) {
1481 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1485 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1486 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1488 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1492 #ifdef HAVE_QUOTA_SUPPORT
1493 /* set/clear over quota flag for a uid/gid */
1494 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1495 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1496 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1498 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %llx, flags %x\n",
1499 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1501 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1509 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1510 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1512 osc_update_grant(cli, body);
1514 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1516 CERROR("Unexpected +ve rc %d\n", rc);
1519 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1521 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1524 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1525 check_write_checksum(&body->oa, peer, client_cksum,
1526 body->oa.o_cksum, aa->aa_requested_nob,
1527 aa->aa_page_count, aa->aa_ppga,
1528 cksum_type_unpack(aa->aa_oa->o_flags)))
1531 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1532 aa->aa_page_count, aa->aa_ppga);
1536 /* The rest of this function executes only for OST_READs */
1538 /* if unwrap_bulk failed, return -EAGAIN to retry */
1539 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1541 GOTO(out, rc = -EAGAIN);
1543 if (rc > aa->aa_requested_nob) {
1544 CERROR("Unexpected rc %d (%d requested)\n", rc,
1545 aa->aa_requested_nob);
1549 if (rc != req->rq_bulk->bd_nob_transferred) {
1550 CERROR ("Unexpected rc %d (%d transferred)\n",
1551 rc, req->rq_bulk->bd_nob_transferred);
1555 if (rc < aa->aa_requested_nob)
1556 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1558 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1559 static int cksum_counter;
1560 __u32 server_cksum = body->oa.o_cksum;
1563 cksum_type_t cksum_type;
1565 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1566 cksum_type = cksum_type_unpack(body->oa.o_flags);
1568 cksum_type = OBD_CKSUM_CRC32;
1569 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1570 aa->aa_ppga, OST_READ,
1573 if (peer->nid == req->rq_bulk->bd_sender) {
1577 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1580 if (server_cksum == ~0 && rc > 0) {
1581 CERROR("Protocol error: server %s set the 'checksum' "
1582 "bit, but didn't send a checksum. Not fatal, "
1583 "but please notify on http://bugzilla.lustre.org/\n",
1584 libcfs_nid2str(peer->nid));
1585 } else if (server_cksum != client_cksum) {
1586 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1587 "%s%s%s inum "LPU64"/"LPU64" object "
1588 LPU64"/"LPU64" extent "
1589 "["LPU64"-"LPU64"]\n",
1590 req->rq_import->imp_obd->obd_name,
1591 libcfs_nid2str(peer->nid),
1593 body->oa.o_valid & OBD_MD_FLFID ?
1594 body->oa.o_fid : (__u64)0,
1595 body->oa.o_valid & OBD_MD_FLFID ?
1596 body->oa.o_generation :(__u64)0,
1598 body->oa.o_valid & OBD_MD_FLGROUP ?
1599 body->oa.o_gr : (__u64)0,
1600 aa->aa_ppga[0]->off,
1601 aa->aa_ppga[aa->aa_page_count-1]->off +
1602 aa->aa_ppga[aa->aa_page_count-1]->count -
1604 CERROR("client %x, server %x, cksum_type %x\n",
1605 client_cksum, server_cksum, cksum_type);
1607 aa->aa_oa->o_cksum = client_cksum;
1611 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1614 } else if (unlikely(client_cksum)) {
1615 static int cksum_missed;
1618 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1619 CERROR("Checksum %u requested from %s but not sent\n",
1620 cksum_missed, libcfs_nid2str(peer->nid));
1626 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1631 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1632 struct lov_stripe_md *lsm,
1633 obd_count page_count, struct brw_page **pga,
1634 struct obd_capa *ocapa)
1636 struct ptlrpc_request *req;
1640 struct l_wait_info lwi;
1644 cfs_waitq_init(&waitq);
1647 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1648 page_count, pga, &req, ocapa, 0);
1652 rc = ptlrpc_queue_wait(req);
1654 if (rc == -ETIMEDOUT && req->rq_resend) {
1655 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1656 ptlrpc_req_finished(req);
1660 rc = osc_brw_fini_request(req, rc);
1662 ptlrpc_req_finished(req);
1663 if (osc_recoverable_error(rc)) {
1665 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1666 CERROR("too many resend retries, returning error\n");
1670 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1671 l_wait_event(waitq, 0, &lwi);
1679 int osc_brw_redo_request(struct ptlrpc_request *request,
1680 struct osc_brw_async_args *aa)
1682 struct ptlrpc_request *new_req;
1683 struct ptlrpc_request_set *set = request->rq_set;
1684 struct osc_brw_async_args *new_aa;
1685 struct osc_async_page *oap;
1689 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1690 CERROR("too many resend retries, returning error\n");
1694 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1696 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1697 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1698 aa->aa_cli, aa->aa_oa,
1699 NULL /* lsm unused by osc currently */,
1700 aa->aa_page_count, aa->aa_ppga,
1701 &new_req, aa->aa_ocapa, 0);
1705 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1707 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1708 if (oap->oap_request != NULL) {
1709 LASSERTF(request == oap->oap_request,
1710 "request %p != oap_request %p\n",
1711 request, oap->oap_request);
1712 if (oap->oap_interrupted) {
1713 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1714 ptlrpc_req_finished(new_req);
1719 /* New request takes over pga and oaps from old request.
1720 * Note that copying a list_head doesn't work, need to move it... */
1722 new_req->rq_interpret_reply = request->rq_interpret_reply;
1723 new_req->rq_async_args = request->rq_async_args;
1724 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1726 new_aa = ptlrpc_req_async_args(new_req);
1728 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1729 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1730 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1732 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1733 if (oap->oap_request) {
1734 ptlrpc_req_finished(oap->oap_request);
1735 oap->oap_request = ptlrpc_request_addref(new_req);
1739 new_aa->aa_ocapa = aa->aa_ocapa;
1740 aa->aa_ocapa = NULL;
1742 /* use ptlrpc_set_add_req is safe because interpret functions work
1743 * in check_set context. only one way exist with access to request
1744 * from different thread got -EINTR - this way protected with
1745 * cl_loi_list_lock */
1746 ptlrpc_set_add_req(set, new_req);
1748 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1750 DEBUG_REQ(D_INFO, new_req, "new request");
1755 * ugh, we want disk allocation on the target to happen in offset order. we'll
1756 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1757 * fine for our small page arrays and doesn't require allocation. its an
1758 * insertion sort that swaps elements that are strides apart, shrinking the
1759 * stride down until its '1' and the array is sorted.
1761 static void sort_brw_pages(struct brw_page **array, int num)
1764 struct brw_page *tmp;
1768 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1773 for (i = stride ; i < num ; i++) {
1776 while (j >= stride && array[j - stride]->off > tmp->off) {
1777 array[j] = array[j - stride];
1782 } while (stride > 1);
1785 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1791 LASSERT (pages > 0);
1792 offset = pg[i]->off & ~CFS_PAGE_MASK;
1796 if (pages == 0) /* that's all */
1799 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1800 return count; /* doesn't end on page boundary */
1803 offset = pg[i]->off & ~CFS_PAGE_MASK;
1804 if (offset != 0) /* doesn't start on page boundary */
1811 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1813 struct brw_page **ppga;
1816 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1820 for (i = 0; i < count; i++)
1825 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1827 LASSERT(ppga != NULL);
1828 OBD_FREE(ppga, sizeof(*ppga) * count);
1831 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1832 obd_count page_count, struct brw_page *pga,
1833 struct obd_trans_info *oti)
1835 struct obdo *saved_oa = NULL;
1836 struct brw_page **ppga, **orig;
1837 struct obd_import *imp = class_exp2cliimp(exp);
1838 struct client_obd *cli;
1839 int rc, page_count_orig;
1842 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1843 cli = &imp->imp_obd->u.cli;
1845 if (cmd & OBD_BRW_CHECK) {
1846 /* The caller just wants to know if there's a chance that this
1847 * I/O can succeed */
1849 if (imp->imp_invalid)
1854 /* test_brw with a failed create can trip this, maybe others. */
1855 LASSERT(cli->cl_max_pages_per_rpc);
1859 orig = ppga = osc_build_ppga(pga, page_count);
1862 page_count_orig = page_count;
1864 sort_brw_pages(ppga, page_count);
1865 while (page_count) {
1866 obd_count pages_per_brw;
1868 if (page_count > cli->cl_max_pages_per_rpc)
1869 pages_per_brw = cli->cl_max_pages_per_rpc;
1871 pages_per_brw = page_count;
1873 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1875 if (saved_oa != NULL) {
1876 /* restore previously saved oa */
1877 *oinfo->oi_oa = *saved_oa;
1878 } else if (page_count > pages_per_brw) {
1879 /* save a copy of oa (brw will clobber it) */
1880 OBDO_ALLOC(saved_oa);
1881 if (saved_oa == NULL)
1882 GOTO(out, rc = -ENOMEM);
1883 *saved_oa = *oinfo->oi_oa;
1886 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1887 pages_per_brw, ppga, oinfo->oi_capa);
1892 page_count -= pages_per_brw;
1893 ppga += pages_per_brw;
1897 osc_release_ppga(orig, page_count_orig);
1899 if (saved_oa != NULL)
1900 OBDO_FREE(saved_oa);
1905 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1906 * the dirty accounting. Writeback completes or truncate happens before
1907 * writing starts. Must be called with the loi lock held. */
1908 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1911 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1915 /* This maintains the lists of pending pages to read/write for a given object
1916 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1917 * to quickly find objects that are ready to send an RPC. */
1918 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1924 if (lop->lop_num_pending == 0)
1927 /* if we have an invalid import we want to drain the queued pages
1928 * by forcing them through rpcs that immediately fail and complete
1929 * the pages. recovery relies on this to empty the queued pages
1930 * before canceling the locks and evicting down the llite pages */
1931 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1934 /* stream rpcs in queue order as long as as there is an urgent page
1935 * queued. this is our cheap solution for good batching in the case
1936 * where writepage marks some random page in the middle of the file
1937 * as urgent because of, say, memory pressure */
1938 if (!cfs_list_empty(&lop->lop_urgent)) {
1939 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1942 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1943 optimal = cli->cl_max_pages_per_rpc;
1944 if (cmd & OBD_BRW_WRITE) {
1945 /* trigger a write rpc stream as long as there are dirtiers
1946 * waiting for space. as they're waiting, they're not going to
1947 * create more pages to coallesce with what's waiting.. */
1948 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1949 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1952 /* +16 to avoid triggering rpcs that would want to include pages
1953 * that are being queued but which can't be made ready until
1954 * the queuer finishes with the page. this is a wart for
1955 * llite::commit_write() */
1958 if (lop->lop_num_pending >= optimal)
1964 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1966 struct osc_async_page *oap;
1969 if (cfs_list_empty(&lop->lop_urgent))
1972 oap = cfs_list_entry(lop->lop_urgent.next,
1973 struct osc_async_page, oap_urgent_item);
1975 if (oap->oap_async_flags & ASYNC_HP) {
1976 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1983 static void on_list(cfs_list_t *item, cfs_list_t *list,
1986 if (cfs_list_empty(item) && should_be_on)
1987 cfs_list_add_tail(item, list);
1988 else if (!cfs_list_empty(item) && !should_be_on)
1989 cfs_list_del_init(item);
1992 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1993 * can find pages to build into rpcs quickly */
1994 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1996 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1997 lop_makes_hprpc(&loi->loi_read_lop)) {
1999 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2000 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2002 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2003 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2004 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2005 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2008 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2009 loi->loi_write_lop.lop_num_pending);
2011 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2012 loi->loi_read_lop.lop_num_pending);
2015 static void lop_update_pending(struct client_obd *cli,
2016 struct loi_oap_pages *lop, int cmd, int delta)
2018 lop->lop_num_pending += delta;
2019 if (cmd & OBD_BRW_WRITE)
2020 cli->cl_pending_w_pages += delta;
2022 cli->cl_pending_r_pages += delta;
2026 * this is called when a sync waiter receives an interruption. Its job is to
2027 * get the caller woken as soon as possible. If its page hasn't been put in an
2028 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2029 * desiring interruption which will forcefully complete the rpc once the rpc
2032 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2034 struct loi_oap_pages *lop;
2035 struct lov_oinfo *loi;
2039 LASSERT(!oap->oap_interrupted);
2040 oap->oap_interrupted = 1;
2042 /* ok, it's been put in an rpc. only one oap gets a request reference */
2043 if (oap->oap_request != NULL) {
2044 ptlrpc_mark_interrupted(oap->oap_request);
2045 ptlrpcd_wake(oap->oap_request);
2046 ptlrpc_req_finished(oap->oap_request);
2047 oap->oap_request = NULL;
2051 * page completion may be called only if ->cpo_prep() method was
2052 * executed by osc_io_submit(), that also adds page the to pending list
2054 if (!cfs_list_empty(&oap->oap_pending_item)) {
2055 cfs_list_del_init(&oap->oap_pending_item);
2056 cfs_list_del_init(&oap->oap_urgent_item);
2059 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2060 &loi->loi_write_lop : &loi->loi_read_lop;
2061 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2062 loi_list_maint(oap->oap_cli, oap->oap_loi);
2063 rc = oap->oap_caller_ops->ap_completion(env,
2064 oap->oap_caller_data,
2065 oap->oap_cmd, NULL, -EINTR);
2071 /* this is trying to propogate async writeback errors back up to the
2072 * application. As an async write fails we record the error code for later if
2073 * the app does an fsync. As long as errors persist we force future rpcs to be
2074 * sync so that the app can get a sync error and break the cycle of queueing
2075 * pages for which writeback will fail. */
2076 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2083 ar->ar_force_sync = 1;
2084 ar->ar_min_xid = ptlrpc_sample_next_xid();
2089 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2090 ar->ar_force_sync = 0;
2093 void osc_oap_to_pending(struct osc_async_page *oap)
2095 struct loi_oap_pages *lop;
2097 if (oap->oap_cmd & OBD_BRW_WRITE)
2098 lop = &oap->oap_loi->loi_write_lop;
2100 lop = &oap->oap_loi->loi_read_lop;
2102 if (oap->oap_async_flags & ASYNC_HP)
2103 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2104 else if (oap->oap_async_flags & ASYNC_URGENT)
2105 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2106 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2107 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2110 /* this must be called holding the loi list lock to give coverage to exit_cache,
2111 * async_flag maintenance, and oap_request */
2112 static void osc_ap_completion(const struct lu_env *env,
2113 struct client_obd *cli, struct obdo *oa,
2114 struct osc_async_page *oap, int sent, int rc)
2119 if (oap->oap_request != NULL) {
2120 xid = ptlrpc_req_xid(oap->oap_request);
2121 ptlrpc_req_finished(oap->oap_request);
2122 oap->oap_request = NULL;
2125 cfs_spin_lock(&oap->oap_lock);
2126 oap->oap_async_flags = 0;
2127 cfs_spin_unlock(&oap->oap_lock);
2128 oap->oap_interrupted = 0;
2130 if (oap->oap_cmd & OBD_BRW_WRITE) {
2131 osc_process_ar(&cli->cl_ar, xid, rc);
2132 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2135 if (rc == 0 && oa != NULL) {
2136 if (oa->o_valid & OBD_MD_FLBLOCKS)
2137 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2138 if (oa->o_valid & OBD_MD_FLMTIME)
2139 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2140 if (oa->o_valid & OBD_MD_FLATIME)
2141 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2142 if (oa->o_valid & OBD_MD_FLCTIME)
2143 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2146 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2147 oap->oap_cmd, oa, rc);
2149 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2150 * I/O on the page could start, but OSC calls it under lock
2151 * and thus we can add oap back to pending safely */
2153 /* upper layer wants to leave the page on pending queue */
2154 osc_oap_to_pending(oap);
2156 osc_exit_cache(cli, oap, sent);
2160 static int brw_interpret(const struct lu_env *env,
2161 struct ptlrpc_request *req, void *data, int rc)
2163 struct osc_brw_async_args *aa = data;
2164 struct client_obd *cli;
2168 rc = osc_brw_fini_request(req, rc);
2169 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2170 if (osc_recoverable_error(rc)) {
2171 rc = osc_brw_redo_request(req, aa);
2177 capa_put(aa->aa_ocapa);
2178 aa->aa_ocapa = NULL;
2183 client_obd_list_lock(&cli->cl_loi_list_lock);
2185 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2186 * is called so we know whether to go to sync BRWs or wait for more
2187 * RPCs to complete */
2188 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2189 cli->cl_w_in_flight--;
2191 cli->cl_r_in_flight--;
2193 async = cfs_list_empty(&aa->aa_oaps);
2194 if (!async) { /* from osc_send_oap_rpc() */
2195 struct osc_async_page *oap, *tmp;
2196 /* the caller may re-use the oap after the completion call so
2197 * we need to clean it up a little */
2198 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2200 cfs_list_del_init(&oap->oap_rpc_item);
2201 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2203 OBDO_FREE(aa->aa_oa);
2204 } else { /* from async_internal() */
2206 for (i = 0; i < aa->aa_page_count; i++)
2207 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2209 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2210 OBDO_FREE(aa->aa_oa);
2212 osc_wake_cache_waiters(cli);
2213 osc_check_rpcs(env, cli);
2214 client_obd_list_unlock(&cli->cl_loi_list_lock);
2216 cl_req_completion(env, aa->aa_clerq, rc);
2217 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2221 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2222 struct client_obd *cli,
2223 cfs_list_t *rpc_list,
2224 int page_count, int cmd)
2226 struct ptlrpc_request *req;
2227 struct brw_page **pga = NULL;
2228 struct osc_brw_async_args *aa;
2229 struct obdo *oa = NULL;
2230 const struct obd_async_page_ops *ops = NULL;
2231 void *caller_data = NULL;
2232 struct osc_async_page *oap;
2233 struct osc_async_page *tmp;
2234 struct ost_body *body;
2235 struct cl_req *clerq = NULL;
2236 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2237 struct ldlm_lock *lock = NULL;
2238 struct cl_req_attr crattr;
2242 LASSERT(!cfs_list_empty(rpc_list));
2244 memset(&crattr, 0, sizeof crattr);
2245 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2247 GOTO(out, req = ERR_PTR(-ENOMEM));
2251 GOTO(out, req = ERR_PTR(-ENOMEM));
2254 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2255 struct cl_page *page = osc_oap2cl_page(oap);
2257 ops = oap->oap_caller_ops;
2258 caller_data = oap->oap_caller_data;
2260 clerq = cl_req_alloc(env, page, crt,
2261 1 /* only 1-object rpcs for
2264 GOTO(out, req = (void *)clerq);
2265 lock = oap->oap_ldlm_lock;
2267 pga[i] = &oap->oap_brw_page;
2268 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2269 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2270 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2272 cl_req_page_add(env, clerq, page);
2275 /* always get the data for the obdo for the rpc */
2276 LASSERT(ops != NULL);
2278 crattr.cra_capa = NULL;
2279 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2281 oa->o_handle = lock->l_remote_handle;
2282 oa->o_valid |= OBD_MD_FLHANDLE;
2285 rc = cl_req_prep(env, clerq);
2287 CERROR("cl_req_prep failed: %d\n", rc);
2288 GOTO(out, req = ERR_PTR(rc));
2291 sort_brw_pages(pga, page_count);
2292 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2293 pga, &req, crattr.cra_capa, 1);
2295 CERROR("prep_req failed: %d\n", rc);
2296 GOTO(out, req = ERR_PTR(rc));
2299 /* Need to update the timestamps after the request is built in case
2300 * we race with setattr (locally or in queue at OST). If OST gets
2301 * later setattr before earlier BRW (as determined by the request xid),
2302 * the OST will not use BRW timestamps. Sadly, there is no obvious
2303 * way to do this in a single call. bug 10150 */
2304 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2305 cl_req_attr_set(env, clerq, &crattr,
2306 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2308 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2309 aa = ptlrpc_req_async_args(req);
2310 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2311 cfs_list_splice(rpc_list, &aa->aa_oaps);
2312 CFS_INIT_LIST_HEAD(rpc_list);
2313 aa->aa_clerq = clerq;
2315 capa_put(crattr.cra_capa);
2320 OBD_FREE(pga, sizeof(*pga) * page_count);
2321 /* this should happen rarely and is pretty bad, it makes the
2322 * pending list not follow the dirty order */
2323 client_obd_list_lock(&cli->cl_loi_list_lock);
2324 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2325 cfs_list_del_init(&oap->oap_rpc_item);
2327 /* queued sync pages can be torn down while the pages
2328 * were between the pending list and the rpc */
2329 if (oap->oap_interrupted) {
2330 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2331 osc_ap_completion(env, cli, NULL, oap, 0,
2335 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2337 if (clerq && !IS_ERR(clerq))
2338 cl_req_completion(env, clerq, PTR_ERR(req));
2344 * prepare pages for ASYNC io and put pages in send queue.
2346 * \param cmd OBD_BRW_* macroses
2347 * \param lop pending pages
2349 * \return zero if pages successfully add to send queue.
2350 * \return not zere if error occurring.
2353 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2354 struct lov_oinfo *loi,
2355 int cmd, struct loi_oap_pages *lop)
2357 struct ptlrpc_request *req;
2358 obd_count page_count = 0;
2359 struct osc_async_page *oap = NULL, *tmp;
2360 struct osc_brw_async_args *aa;
2361 const struct obd_async_page_ops *ops;
2362 CFS_LIST_HEAD(rpc_list);
2363 CFS_LIST_HEAD(tmp_list);
2364 unsigned int ending_offset;
2365 unsigned starting_offset = 0;
2367 struct cl_object *clob = NULL;
2370 /* ASYNC_HP pages first. At present, when the lock the pages is
2371 * to be canceled, the pages covered by the lock will be sent out
2372 * with ASYNC_HP. We have to send out them as soon as possible. */
2373 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2374 if (oap->oap_async_flags & ASYNC_HP)
2375 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2377 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2378 if (++page_count >= cli->cl_max_pages_per_rpc)
2382 cfs_list_splice(&tmp_list, &lop->lop_pending);
2385 /* first we find the pages we're allowed to work with */
2386 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2388 ops = oap->oap_caller_ops;
2390 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2391 "magic 0x%x\n", oap, oap->oap_magic);
2394 /* pin object in memory, so that completion call-backs
2395 * can be safely called under client_obd_list lock. */
2396 clob = osc_oap2cl_page(oap)->cp_obj;
2397 cl_object_get(clob);
2400 if (page_count != 0 &&
2401 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2402 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2403 " oap %p, page %p, srvlock %u\n",
2404 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2408 /* If there is a gap at the start of this page, it can't merge
2409 * with any previous page, so we'll hand the network a
2410 * "fragmented" page array that it can't transfer in 1 RDMA */
2411 if (page_count != 0 && oap->oap_page_off != 0)
2414 /* in llite being 'ready' equates to the page being locked
2415 * until completion unlocks it. commit_write submits a page
2416 * as not ready because its unlock will happen unconditionally
2417 * as the call returns. if we race with commit_write giving
2418 * us that page we dont' want to create a hole in the page
2419 * stream, so we stop and leave the rpc to be fired by
2420 * another dirtier or kupdated interval (the not ready page
2421 * will still be on the dirty list). we could call in
2422 * at the end of ll_file_write to process the queue again. */
2423 if (!(oap->oap_async_flags & ASYNC_READY)) {
2424 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2427 CDEBUG(D_INODE, "oap %p page %p returned %d "
2428 "instead of ready\n", oap,
2432 /* llite is telling us that the page is still
2433 * in commit_write and that we should try
2434 * and put it in an rpc again later. we
2435 * break out of the loop so we don't create
2436 * a hole in the sequence of pages in the rpc
2441 /* the io isn't needed.. tell the checks
2442 * below to complete the rpc with EINTR */
2443 cfs_spin_lock(&oap->oap_lock);
2444 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2445 cfs_spin_unlock(&oap->oap_lock);
2446 oap->oap_count = -EINTR;
2449 cfs_spin_lock(&oap->oap_lock);
2450 oap->oap_async_flags |= ASYNC_READY;
2451 cfs_spin_unlock(&oap->oap_lock);
2454 LASSERTF(0, "oap %p page %p returned %d "
2455 "from make_ready\n", oap,
2463 * Page submitted for IO has to be locked. Either by
2464 * ->ap_make_ready() or by higher layers.
2466 #if defined(__KERNEL__) && defined(__linux__)
2468 struct cl_page *page;
2470 page = osc_oap2cl_page(oap);
2472 if (page->cp_type == CPT_CACHEABLE &&
2473 !(PageLocked(oap->oap_page) &&
2474 (CheckWriteback(oap->oap_page, cmd)))) {
2475 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2477 (long)oap->oap_page->flags,
2478 oap->oap_async_flags);
2484 /* take the page out of our book-keeping */
2485 cfs_list_del_init(&oap->oap_pending_item);
2486 lop_update_pending(cli, lop, cmd, -1);
2487 cfs_list_del_init(&oap->oap_urgent_item);
2489 if (page_count == 0)
2490 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2491 (PTLRPC_MAX_BRW_SIZE - 1);
2493 /* ask the caller for the size of the io as the rpc leaves. */
2494 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2496 ops->ap_refresh_count(env, oap->oap_caller_data,
2498 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2500 if (oap->oap_count <= 0) {
2501 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2503 osc_ap_completion(env, cli, NULL,
2504 oap, 0, oap->oap_count);
2508 /* now put the page back in our accounting */
2509 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2510 if (page_count == 0)
2511 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2512 if (++page_count >= cli->cl_max_pages_per_rpc)
2515 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2516 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2517 * have the same alignment as the initial writes that allocated
2518 * extents on the server. */
2519 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2520 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2521 if (ending_offset == 0)
2524 /* If there is a gap at the end of this page, it can't merge
2525 * with any subsequent pages, so we'll hand the network a
2526 * "fragmented" page array that it can't transfer in 1 RDMA */
2527 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2531 osc_wake_cache_waiters(cli);
2533 loi_list_maint(cli, loi);
2535 client_obd_list_unlock(&cli->cl_loi_list_lock);
2538 cl_object_put(env, clob);
2540 if (page_count == 0) {
2541 client_obd_list_lock(&cli->cl_loi_list_lock);
2545 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2547 LASSERT(cfs_list_empty(&rpc_list));
2548 loi_list_maint(cli, loi);
2549 RETURN(PTR_ERR(req));
2552 aa = ptlrpc_req_async_args(req);
2554 if (cmd == OBD_BRW_READ) {
2555 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2556 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2557 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2558 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2560 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2561 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2562 cli->cl_w_in_flight);
2563 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2564 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2566 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2568 client_obd_list_lock(&cli->cl_loi_list_lock);
2570 if (cmd == OBD_BRW_READ)
2571 cli->cl_r_in_flight++;
2573 cli->cl_w_in_flight++;
2575 /* queued sync pages can be torn down while the pages
2576 * were between the pending list and the rpc */
2578 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2579 /* only one oap gets a request reference */
2582 if (oap->oap_interrupted && !req->rq_intr) {
2583 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2585 ptlrpc_mark_interrupted(req);
2589 tmp->oap_request = ptlrpc_request_addref(req);
2591 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2592 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2594 req->rq_interpret_reply = brw_interpret;
2595 ptlrpcd_add_req(req, PSCOPE_BRW);
2599 #define LOI_DEBUG(LOI, STR, args...) \
2600 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2601 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2602 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2603 (LOI)->loi_write_lop.lop_num_pending, \
2604 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2605 (LOI)->loi_read_lop.lop_num_pending, \
2606 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2609 /* This is called by osc_check_rpcs() to find which objects have pages that
2610 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2611 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2615 /* First return objects that have blocked locks so that they
2616 * will be flushed quickly and other clients can get the lock,
2617 * then objects which have pages ready to be stuffed into RPCs */
2618 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2619 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2620 struct lov_oinfo, loi_hp_ready_item));
2621 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2622 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2623 struct lov_oinfo, loi_ready_item));
2625 /* then if we have cache waiters, return all objects with queued
2626 * writes. This is especially important when many small files
2627 * have filled up the cache and not been fired into rpcs because
2628 * they don't pass the nr_pending/object threshhold */
2629 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2630 !cfs_list_empty(&cli->cl_loi_write_list))
2631 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2632 struct lov_oinfo, loi_write_item));
2634 /* then return all queued objects when we have an invalid import
2635 * so that they get flushed */
2636 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2637 if (!cfs_list_empty(&cli->cl_loi_write_list))
2638 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2641 if (!cfs_list_empty(&cli->cl_loi_read_list))
2642 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2643 struct lov_oinfo, loi_read_item));
2648 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2650 struct osc_async_page *oap;
2653 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2654 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2655 struct osc_async_page, oap_urgent_item);
2656 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2659 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2660 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2661 struct osc_async_page, oap_urgent_item);
2662 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2665 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2668 /* called with the loi list lock held */
2669 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2671 struct lov_oinfo *loi;
2672 int rc = 0, race_counter = 0;
2675 while ((loi = osc_next_loi(cli)) != NULL) {
2676 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2678 if (osc_max_rpc_in_flight(cli, loi))
2681 /* attempt some read/write balancing by alternating between
2682 * reads and writes in an object. The makes_rpc checks here
2683 * would be redundant if we were getting read/write work items
2684 * instead of objects. we don't want send_oap_rpc to drain a
2685 * partial read pending queue when we're given this object to
2686 * do io on writes while there are cache waiters */
2687 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2688 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2689 &loi->loi_write_lop);
2691 CERROR("Write request failed with %d\n", rc);
2693 /* osc_send_oap_rpc failed, mostly because of
2696 * It can't break here, because if:
2697 * - a page was submitted by osc_io_submit, so
2699 * - no request in flight
2700 * - no subsequent request
2701 * The system will be in live-lock state,
2702 * because there is no chance to call
2703 * osc_io_unplug() and osc_check_rpcs() any
2704 * more. pdflush can't help in this case,
2705 * because it might be blocked at grabbing
2706 * the page lock as we mentioned.
2708 * Anyway, continue to drain pages. */
2717 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2718 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2719 &loi->loi_read_lop);
2721 CERROR("Read request failed with %d\n", rc);
2729 /* attempt some inter-object balancing by issueing rpcs
2730 * for each object in turn */
2731 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2732 cfs_list_del_init(&loi->loi_hp_ready_item);
2733 if (!cfs_list_empty(&loi->loi_ready_item))
2734 cfs_list_del_init(&loi->loi_ready_item);
2735 if (!cfs_list_empty(&loi->loi_write_item))
2736 cfs_list_del_init(&loi->loi_write_item);
2737 if (!cfs_list_empty(&loi->loi_read_item))
2738 cfs_list_del_init(&loi->loi_read_item);
2740 loi_list_maint(cli, loi);
2742 /* send_oap_rpc fails with 0 when make_ready tells it to
2743 * back off. llite's make_ready does this when it tries
2744 * to lock a page queued for write that is already locked.
2745 * we want to try sending rpcs from many objects, but we
2746 * don't want to spin failing with 0. */
2747 if (race_counter == 10)
2753 /* we're trying to queue a page in the osc so we're subject to the
2754 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2755 * If the osc's queued pages are already at that limit, then we want to sleep
2756 * until there is space in the osc's queue for us. We also may be waiting for
2757 * write credits from the OST if there are RPCs in flight that may return some
2758 * before we fall back to sync writes.
2760 * We need this know our allocation was granted in the presence of signals */
2761 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2765 client_obd_list_lock(&cli->cl_loi_list_lock);
2766 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2767 client_obd_list_unlock(&cli->cl_loi_list_lock);
2772 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2775 int osc_enter_cache_try(const struct lu_env *env,
2776 struct client_obd *cli, struct lov_oinfo *loi,
2777 struct osc_async_page *oap, int transient)
2781 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2783 osc_consume_write_grant(cli, &oap->oap_brw_page);
2785 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2786 cfs_atomic_inc(&obd_dirty_transit_pages);
2787 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2793 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2794 * grant or cache space. */
2795 static int osc_enter_cache(const struct lu_env *env,
2796 struct client_obd *cli, struct lov_oinfo *loi,
2797 struct osc_async_page *oap)
2799 struct osc_cache_waiter ocw;
2800 struct l_wait_info lwi = { 0 };
2804 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2805 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2806 cli->cl_dirty_max, obd_max_dirty_pages,
2807 cli->cl_lost_grant, cli->cl_avail_grant);
2809 /* force the caller to try sync io. this can jump the list
2810 * of queued writes and create a discontiguous rpc stream */
2811 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2812 loi->loi_ar.ar_force_sync)
2815 /* Hopefully normal case - cache space and write credits available */
2816 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2817 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2818 osc_enter_cache_try(env, cli, loi, oap, 0))
2821 /* It is safe to block as a cache waiter as long as there is grant
2822 * space available or the hope of additional grant being returned
2823 * when an in flight write completes. Using the write back cache
2824 * if possible is preferable to sending the data synchronously
2825 * because write pages can then be merged in to large requests.
2826 * The addition of this cache waiter will causing pending write
2827 * pages to be sent immediately. */
2828 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2829 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2830 cfs_waitq_init(&ocw.ocw_waitq);
2834 loi_list_maint(cli, loi);
2835 osc_check_rpcs(env, cli);
2836 client_obd_list_unlock(&cli->cl_loi_list_lock);
2838 CDEBUG(D_CACHE, "sleeping for cache space\n");
2839 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2841 client_obd_list_lock(&cli->cl_loi_list_lock);
2842 if (!cfs_list_empty(&ocw.ocw_entry)) {
2843 cfs_list_del(&ocw.ocw_entry);
2853 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2854 struct lov_oinfo *loi, cfs_page_t *page,
2855 obd_off offset, const struct obd_async_page_ops *ops,
2856 void *data, void **res, int nocache,
2857 struct lustre_handle *lockh)
2859 struct osc_async_page *oap;
2864 return cfs_size_round(sizeof(*oap));
2867 oap->oap_magic = OAP_MAGIC;
2868 oap->oap_cli = &exp->exp_obd->u.cli;
2871 oap->oap_caller_ops = ops;
2872 oap->oap_caller_data = data;
2874 oap->oap_page = page;
2875 oap->oap_obj_off = offset;
2876 if (!client_is_remote(exp) &&
2877 cfs_capable(CFS_CAP_SYS_RESOURCE))
2878 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2880 LASSERT(!(offset & ~CFS_PAGE_MASK));
2882 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2883 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2884 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2885 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2887 cfs_spin_lock_init(&oap->oap_lock);
2888 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2892 struct osc_async_page *oap_from_cookie(void *cookie)
2894 struct osc_async_page *oap = cookie;
2895 if (oap->oap_magic != OAP_MAGIC)
2896 return ERR_PTR(-EINVAL);
2900 int osc_queue_async_io(const struct lu_env *env,
2901 struct obd_export *exp, struct lov_stripe_md *lsm,
2902 struct lov_oinfo *loi, void *cookie,
2903 int cmd, obd_off off, int count,
2904 obd_flag brw_flags, enum async_flags async_flags)
2906 struct client_obd *cli = &exp->exp_obd->u.cli;
2907 struct osc_async_page *oap;
2911 oap = oap_from_cookie(cookie);
2913 RETURN(PTR_ERR(oap));
2915 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2918 if (!cfs_list_empty(&oap->oap_pending_item) ||
2919 !cfs_list_empty(&oap->oap_urgent_item) ||
2920 !cfs_list_empty(&oap->oap_rpc_item))
2923 /* check if the file's owner/group is over quota */
2924 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2925 struct cl_object *obj;
2926 struct cl_attr attr; /* XXX put attr into thread info */
2927 unsigned int qid[MAXQUOTAS];
2929 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2931 cl_object_attr_lock(obj);
2932 rc = cl_object_attr_get(env, obj, &attr);
2933 cl_object_attr_unlock(obj);
2935 qid[USRQUOTA] = attr.cat_uid;
2936 qid[GRPQUOTA] = attr.cat_gid;
2938 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2945 loi = lsm->lsm_oinfo[0];
2947 client_obd_list_lock(&cli->cl_loi_list_lock);
2949 LASSERT(off + count <= CFS_PAGE_SIZE);
2951 oap->oap_page_off = off;
2952 oap->oap_count = count;
2953 oap->oap_brw_flags = brw_flags;
2954 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2955 if (libcfs_memory_pressure_get())
2956 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2957 cfs_spin_lock(&oap->oap_lock);
2958 oap->oap_async_flags = async_flags;
2959 cfs_spin_unlock(&oap->oap_lock);
2961 if (cmd & OBD_BRW_WRITE) {
2962 rc = osc_enter_cache(env, cli, loi, oap);
2964 client_obd_list_unlock(&cli->cl_loi_list_lock);
2969 osc_oap_to_pending(oap);
2970 loi_list_maint(cli, loi);
2972 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2975 osc_check_rpcs(env, cli);
2976 client_obd_list_unlock(&cli->cl_loi_list_lock);
2981 /* aka (~was & now & flag), but this is more clear :) */
2982 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2984 int osc_set_async_flags_base(struct client_obd *cli,
2985 struct lov_oinfo *loi, struct osc_async_page *oap,
2986 obd_flag async_flags)
2988 struct loi_oap_pages *lop;
2992 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
2994 if (oap->oap_cmd & OBD_BRW_WRITE) {
2995 lop = &loi->loi_write_lop;
2997 lop = &loi->loi_read_lop;
3000 if ((oap->oap_async_flags & async_flags) == async_flags)
3003 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3004 flags |= ASYNC_READY;
3006 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3007 cfs_list_empty(&oap->oap_rpc_item)) {
3008 if (oap->oap_async_flags & ASYNC_HP)
3009 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3011 cfs_list_add_tail(&oap->oap_urgent_item,
3013 flags |= ASYNC_URGENT;
3014 loi_list_maint(cli, loi);
3016 cfs_spin_lock(&oap->oap_lock);
3017 oap->oap_async_flags |= flags;
3018 cfs_spin_unlock(&oap->oap_lock);
3020 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3021 oap->oap_async_flags);
3025 int osc_teardown_async_page(struct obd_export *exp,
3026 struct lov_stripe_md *lsm,
3027 struct lov_oinfo *loi, void *cookie)
3029 struct client_obd *cli = &exp->exp_obd->u.cli;
3030 struct loi_oap_pages *lop;
3031 struct osc_async_page *oap;
3035 oap = oap_from_cookie(cookie);
3037 RETURN(PTR_ERR(oap));
3040 loi = lsm->lsm_oinfo[0];
3042 if (oap->oap_cmd & OBD_BRW_WRITE) {
3043 lop = &loi->loi_write_lop;
3045 lop = &loi->loi_read_lop;
3048 client_obd_list_lock(&cli->cl_loi_list_lock);
3050 if (!cfs_list_empty(&oap->oap_rpc_item))
3051 GOTO(out, rc = -EBUSY);
3053 osc_exit_cache(cli, oap, 0);
3054 osc_wake_cache_waiters(cli);
3056 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3057 cfs_list_del_init(&oap->oap_urgent_item);
3058 cfs_spin_lock(&oap->oap_lock);
3059 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3060 cfs_spin_unlock(&oap->oap_lock);
3062 if (!cfs_list_empty(&oap->oap_pending_item)) {
3063 cfs_list_del_init(&oap->oap_pending_item);
3064 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3066 loi_list_maint(cli, loi);
3067 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3069 client_obd_list_unlock(&cli->cl_loi_list_lock);
3073 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3074 struct ldlm_enqueue_info *einfo,
3077 void *data = einfo->ei_cbdata;
3079 LASSERT(lock != NULL);
3080 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3081 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3082 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3083 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3085 lock_res_and_lock(lock);
3086 cfs_spin_lock(&osc_ast_guard);
3087 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3088 lock->l_ast_data = data;
3089 cfs_spin_unlock(&osc_ast_guard);
3090 unlock_res_and_lock(lock);
3093 static void osc_set_data_with_check(struct lustre_handle *lockh,
3094 struct ldlm_enqueue_info *einfo,
3097 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3100 osc_set_lock_data_with_check(lock, einfo, flags);
3101 LDLM_LOCK_PUT(lock);
3103 CERROR("lockh %p, data %p - client evicted?\n",
3104 lockh, einfo->ei_cbdata);
3107 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3108 ldlm_iterator_t replace, void *data)
3110 struct ldlm_res_id res_id;
3111 struct obd_device *obd = class_exp2obd(exp);
3113 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3114 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3118 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3119 obd_enqueue_update_f upcall, void *cookie,
3122 int intent = *flags & LDLM_FL_HAS_INTENT;
3126 /* The request was created before ldlm_cli_enqueue call. */
3127 if (rc == ELDLM_LOCK_ABORTED) {
3128 struct ldlm_reply *rep;
3129 rep = req_capsule_server_get(&req->rq_pill,
3132 LASSERT(rep != NULL);
3133 if (rep->lock_policy_res1)
3134 rc = rep->lock_policy_res1;
3138 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3139 *flags |= LDLM_FL_LVB_READY;
3140 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3141 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3144 /* Call the update callback. */
3145 rc = (*upcall)(cookie, rc);
3149 static int osc_enqueue_interpret(const struct lu_env *env,
3150 struct ptlrpc_request *req,
3151 struct osc_enqueue_args *aa, int rc)
3153 struct ldlm_lock *lock;
3154 struct lustre_handle handle;
3157 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3158 * might be freed anytime after lock upcall has been called. */
3159 lustre_handle_copy(&handle, aa->oa_lockh);
3160 mode = aa->oa_ei->ei_mode;
3162 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3164 lock = ldlm_handle2lock(&handle);
3166 /* Take an additional reference so that a blocking AST that
3167 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3168 * to arrive after an upcall has been executed by
3169 * osc_enqueue_fini(). */
3170 ldlm_lock_addref(&handle, mode);
3172 /* Complete obtaining the lock procedure. */
3173 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,