1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 rc = sa->sa_upcall(sa->sa_cookie, rc);
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 obd_enqueue_update_f upcall, void *cookie,
370 struct ptlrpc_request_set *rqset)
372 struct ptlrpc_request *req;
373 struct osc_setattr_args *sa;
377 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 ptlrpc_request_free(req);
388 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 /* do mds to ost setattr asynchronously */
397 /* Do not wait for response. */
398 ptlrpcd_add_req(req, PSCOPE_OTHER);
400 req->rq_interpret_reply =
401 (ptlrpc_interpterer_t)osc_setattr_interpret;
403 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404 sa = ptlrpc_req_async_args(req);
405 sa->sa_oa = oinfo->oi_oa;
406 sa->sa_upcall = upcall;
407 sa->sa_cookie = cookie;
409 if (rqset == PTLRPCD_SET)
410 ptlrpcd_add_req(req, PSCOPE_OTHER);
412 ptlrpc_set_add_req(rqset, req);
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419 struct obd_trans_info *oti,
420 struct ptlrpc_request_set *rqset)
422 return osc_setattr_async_base(exp, oinfo, oti,
423 oinfo->oi_cb_up, oinfo, rqset);
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427 struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 struct ptlrpc_request *req;
430 struct ost_body *body;
431 struct lov_stripe_md *lsm;
440 rc = obd_alloc_memmd(exp, &lsm);
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447 GOTO(out, rc = -ENOMEM);
449 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451 ptlrpc_request_free(req);
455 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457 lustre_set_wire_obdo(&body->oa, oa);
459 ptlrpc_request_set_replen(req);
461 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462 oa->o_flags == OBD_FL_DELORPHAN) {
464 "delorphan from OST integration");
465 /* Don't resend the delorphan req */
466 req->rq_no_resend = req->rq_no_delay = 1;
469 rc = ptlrpc_queue_wait(req);
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 GOTO(out_req, rc = -EPROTO);
477 lustre_get_wire_obdo(oa, &body->oa);
479 /* This should really be sent by the OST */
480 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481 oa->o_valid |= OBD_MD_FLBLKSZ;
483 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484 * have valid lsm_oinfo data structs, so don't go touching that.
485 * This needs to be fixed in a big way.
487 lsm->lsm_object_id = oa->o_id;
488 lsm->lsm_object_seq = oa->o_seq;
492 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495 if (!oti->oti_logcookies)
496 oti_alloc_cookies(oti, 1);
497 *oti->oti_logcookies = oa->o_lcookie;
501 CDEBUG(D_HA, "transno: "LPD64"\n",
502 lustre_msg_get_transno(req->rq_repmsg));
504 ptlrpc_req_finished(req);
507 obd_free_memmd(exp, &lsm);
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512 obd_enqueue_update_f upcall, void *cookie,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_setattr_args *sa;
517 struct ost_body *body;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 ptlrpc_request_free(req);
531 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532 ptlrpc_at_set_req_timeout(req);
534 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537 osc_pack_capa(req, body, oinfo->oi_capa);
539 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PSCOPE_OTHER);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557 struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568 struct lov_stripe_md *md, obd_size start, obd_size end,
571 struct ptlrpc_request *req;
572 struct ost_body *body;
577 CDEBUG(D_INFO, "oa NULL\n");
581 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
585 osc_set_capa_size(req, &RMF_CAPA1, capa);
586 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588 ptlrpc_request_free(req);
592 /* overload the size and blocks fields in the oa with start/end */
593 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595 lustre_set_wire_obdo(&body->oa, oa);
596 body->oa.o_size = start;
597 body->oa.o_blocks = end;
598 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599 osc_pack_capa(req, body, capa);
601 ptlrpc_request_set_replen(req);
603 rc = ptlrpc_queue_wait(req);
607 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
609 GOTO(out, rc = -EPROTO);
611 lustre_get_wire_obdo(oa, &body->oa);
615 ptlrpc_req_finished(req);
619 /* Find and cancel locally locks matched by @mode in the resource found by
620 * @objid. Found locks are added into @cancel list. Returns the amount of
621 * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
624 ldlm_mode_t mode, int lock_flags)
626 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627 struct ldlm_res_id res_id;
628 struct ldlm_resource *res;
632 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
637 LDLM_RESOURCE_ADDREF(res);
638 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639 lock_flags, 0, NULL);
640 LDLM_RESOURCE_DELREF(res);
641 ldlm_resource_putref(res);
645 static int osc_destroy_interpret(const struct lu_env *env,
646 struct ptlrpc_request *req, void *data,
649 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
651 cfs_atomic_dec(&cli->cl_destroy_in_flight);
652 cfs_waitq_signal(&cli->cl_destroy_waitq);
656 static int osc_can_send_destroy(struct client_obd *cli)
658 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659 cli->cl_max_rpcs_in_flight) {
660 /* The destroy request can be sent */
663 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664 cli->cl_max_rpcs_in_flight) {
666 * The counter has been modified between the two atomic
669 cfs_waitq_signal(&cli->cl_destroy_waitq);
674 /* Destroy requests can be async always on the client, and we don't even really
675 * care about the return code since the client cannot do anything at all about
677 * When the MDS is unlinking a filename, it saves the file objects into a
678 * recovery llog, and these object records are cancelled when the OST reports
679 * they were destroyed and sync'd to disk (i.e. transaction committed).
680 * If the client dies, or the OST is down when the object should be destroyed,
681 * the records are not cancelled, and when the OST reconnects to the MDS next,
682 * it will retrieve the llog unlink logs and then sends the log cancellation
683 * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685 struct lov_stripe_md *ea, struct obd_trans_info *oti,
686 struct obd_export *md_export, void *capa)
688 struct client_obd *cli = &exp->exp_obd->u.cli;
689 struct ptlrpc_request *req;
690 struct ost_body *body;
691 CFS_LIST_HEAD(cancels);
696 CDEBUG(D_INFO, "oa NULL\n");
700 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701 LDLM_FL_DISCARD_DATA);
703 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
705 ldlm_lock_list_put(&cancels, l_bl_ast, count);
709 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
713 ptlrpc_request_free(req);
717 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718 ptlrpc_at_set_req_timeout(req);
720 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721 oa->o_lcookie = *oti->oti_logcookies;
722 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
724 lustre_set_wire_obdo(&body->oa, oa);
726 osc_pack_capa(req, body, (struct obd_capa *)capa);
727 ptlrpc_request_set_replen(req);
729 /* don't throttle destroy RPCs for the MDT */
730 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731 req->rq_interpret_reply = osc_destroy_interpret;
732 if (!osc_can_send_destroy(cli)) {
733 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
737 * Wait until the number of on-going destroy RPCs drops
738 * under max_rpc_in_flight
740 l_wait_event_exclusive(cli->cl_destroy_waitq,
741 osc_can_send_destroy(cli), &lwi);
745 /* Do not wait for response */
746 ptlrpcd_add_req(req, PSCOPE_OTHER);
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
753 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
755 LASSERT(!(oa->o_valid & bits));
758 client_obd_list_lock(&cli->cl_loi_list_lock);
759 oa->o_dirty = cli->cl_dirty;
760 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
764 } else if (cfs_atomic_read(&obd_dirty_pages) -
765 cfs_atomic_read(&obd_dirty_transit_pages) >
766 obd_max_dirty_pages + 1){
767 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768 * not covered by a lock thus they may safely race and trip
769 * this CERROR() unless we add in a small fudge factor (+1). */
770 CERROR("dirty %d - %d > system dirty_max %d\n",
771 cfs_atomic_read(&obd_dirty_pages),
772 cfs_atomic_read(&obd_dirty_transit_pages),
773 obd_max_dirty_pages);
775 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776 CERROR("dirty %lu - dirty_max %lu too big???\n",
777 cli->cl_dirty, cli->cl_dirty_max);
780 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781 (cli->cl_max_rpcs_in_flight + 1);
782 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
784 oa->o_grant = cli->cl_avail_grant;
785 oa->o_dropped = cli->cl_lost_grant;
786 cli->cl_lost_grant = 0;
787 client_obd_list_unlock(&cli->cl_loi_list_lock);
788 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
793 static void osc_update_next_shrink(struct client_obd *cli)
795 cli->cl_next_shrink_grant =
796 cfs_time_shift(cli->cl_grant_shrink_interval);
797 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798 cli->cl_next_shrink_grant);
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803 struct brw_page *pga)
805 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807 cfs_atomic_inc(&obd_dirty_pages);
808 cli->cl_dirty += CFS_PAGE_SIZE;
809 cli->cl_avail_grant -= CFS_PAGE_SIZE;
810 pga->flag |= OBD_BRW_FROM_GRANT;
811 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812 CFS_PAGE_SIZE, pga, pga->pg);
813 LASSERT(cli->cl_avail_grant >= 0);
814 osc_update_next_shrink(cli);
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818 * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820 struct brw_page *pga, int sent)
822 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
825 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
831 pga->flag &= ~OBD_BRW_FROM_GRANT;
832 cfs_atomic_dec(&obd_dirty_pages);
833 cli->cl_dirty -= CFS_PAGE_SIZE;
834 if (pga->flag & OBD_BRW_NOCACHE) {
835 pga->flag &= ~OBD_BRW_NOCACHE;
836 cfs_atomic_dec(&obd_dirty_transit_pages);
837 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
840 cli->cl_lost_grant += CFS_PAGE_SIZE;
841 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844 /* For short writes we shouldn't count parts of pages that
845 * span a whole block on the OST side, or our accounting goes
846 * wrong. Should match the code in filter_grant_check. */
847 int offset = pga->off & ~CFS_PAGE_MASK;
848 int count = pga->count + (offset & (blocksize - 1));
849 int end = (offset + pga->count) & (blocksize - 1);
851 count += blocksize - end;
853 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856 cli->cl_avail_grant, cli->cl_dirty);
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
864 return cli->cl_r_in_flight + cli->cl_w_in_flight;
867 /* caller must hold loi_list_lock */
868 void osc_wake_cache_waiters(struct client_obd *cli)
871 struct osc_cache_waiter *ocw;
874 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
875 /* if we can't dirty more, we must wait until some is written */
876 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
877 (cfs_atomic_read(&obd_dirty_pages) + 1 >
878 obd_max_dirty_pages)) {
879 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
880 "osc max %ld, sys max %d\n", cli->cl_dirty,
881 cli->cl_dirty_max, obd_max_dirty_pages);
885 /* if still dirty cache but no grant wait for pending RPCs that
886 * may yet return us some grant before doing sync writes */
887 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
888 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
889 cli->cl_w_in_flight);
893 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
894 cfs_list_del_init(&ocw->ocw_entry);
895 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
896 /* no more RPCs in flight to return grant, do sync IO */
897 ocw->ocw_rc = -EDQUOT;
898 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
900 osc_consume_write_grant(cli,
901 &ocw->ocw_oap->oap_brw_page);
904 cfs_waitq_signal(&ocw->ocw_waitq);
910 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
912 client_obd_list_lock(&cli->cl_loi_list_lock);
913 cli->cl_avail_grant += grant;
914 client_obd_list_unlock(&cli->cl_loi_list_lock);
917 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
919 if (body->oa.o_valid & OBD_MD_FLGRANT) {
920 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
921 __osc_update_grant(cli, body->oa.o_grant);
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926 void *key, obd_count vallen, void *val,
927 struct ptlrpc_request_set *set);
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930 struct ptlrpc_request *req,
933 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935 struct ost_body *body;
938 __osc_update_grant(cli, oa->o_grant);
942 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
944 osc_update_grant(cli, body);
950 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
952 client_obd_list_lock(&cli->cl_loi_list_lock);
953 oa->o_grant = cli->cl_avail_grant / 4;
954 cli->cl_avail_grant -= oa->o_grant;
955 client_obd_list_unlock(&cli->cl_loi_list_lock);
956 oa->o_flags |= OBD_FL_SHRINK_GRANT;
957 osc_update_next_shrink(cli);
960 /* Shrink the current grant, either from some large amount to enough for a
961 * full set of in-flight RPCs, or if we have already shrunk to that limit
962 * then to enough for a single RPC. This avoids keeping more grant than
963 * needed, and avoids shrinking the grant piecemeal. */
964 static int osc_shrink_grant(struct client_obd *cli)
966 long target = (cli->cl_max_rpcs_in_flight + 1) *
967 cli->cl_max_pages_per_rpc;
969 client_obd_list_lock(&cli->cl_loi_list_lock);
970 if (cli->cl_avail_grant <= target)
971 target = cli->cl_max_pages_per_rpc;
972 client_obd_list_unlock(&cli->cl_loi_list_lock);
974 return osc_shrink_grant_to_target(cli, target);
977 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
980 struct ost_body *body;
983 client_obd_list_lock(&cli->cl_loi_list_lock);
984 /* Don't shrink if we are already above or below the desired limit
985 * We don't want to shrink below a single RPC, as that will negatively
986 * impact block allocation and long-term performance. */
987 if (target < cli->cl_max_pages_per_rpc)
988 target = cli->cl_max_pages_per_rpc;
990 if (target >= cli->cl_avail_grant) {
991 client_obd_list_unlock(&cli->cl_loi_list_lock);
994 client_obd_list_unlock(&cli->cl_loi_list_lock);
1000 osc_announce_cached(cli, &body->oa, 0);
1002 client_obd_list_lock(&cli->cl_loi_list_lock);
1003 body->oa.o_grant = cli->cl_avail_grant - target;
1004 cli->cl_avail_grant = target;
1005 client_obd_list_unlock(&cli->cl_loi_list_lock);
1006 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1007 osc_update_next_shrink(cli);
1009 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1010 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1011 sizeof(*body), body, NULL);
1013 __osc_update_grant(cli, body->oa.o_grant);
1018 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1019 static int osc_should_shrink_grant(struct client_obd *client)
1021 cfs_time_t time = cfs_time_current();
1022 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1024 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1025 OBD_CONNECT_GRANT_SHRINK) == 0)
1028 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1029 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1030 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1033 osc_update_next_shrink(client);
1038 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1040 struct client_obd *client;
1042 cfs_list_for_each_entry(client, &item->ti_obd_list,
1043 cl_grant_shrink_list) {
1044 if (osc_should_shrink_grant(client))
1045 osc_shrink_grant(client);
1050 static int osc_add_shrink_grant(struct client_obd *client)
1054 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1056 osc_grant_shrink_grant_cb, NULL,
1057 &client->cl_grant_shrink_list);
1059 CERROR("add grant client %s error %d\n",
1060 client->cl_import->imp_obd->obd_name, rc);
1063 CDEBUG(D_CACHE, "add grant client %s \n",
1064 client->cl_import->imp_obd->obd_name);
1065 osc_update_next_shrink(client);
1069 static int osc_del_shrink_grant(struct client_obd *client)
1071 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1075 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1078 * ocd_grant is the total grant amount we're expect to hold: if we've
1079 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1080 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1082 * race is tolerable here: if we're evicted, but imp_state already
1083 * left EVICTED state, then cl_dirty must be 0 already.
1085 client_obd_list_lock(&cli->cl_loi_list_lock);
1086 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1087 cli->cl_avail_grant = ocd->ocd_grant;
1089 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1091 if (cli->cl_avail_grant < 0) {
1092 CWARN("%s: available grant < 0, the OSS is probably not running"
1093 " with patch from bug20278 (%ld) \n",
1094 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1095 /* workaround for 1.6 servers which do not have
1096 * the patch from bug20278 */
1097 cli->cl_avail_grant = ocd->ocd_grant;
1100 client_obd_list_unlock(&cli->cl_loi_list_lock);
1102 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1103 cli->cl_import->imp_obd->obd_name,
1104 cli->cl_avail_grant, cli->cl_lost_grant);
1106 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1107 cfs_list_empty(&cli->cl_grant_shrink_list))
1108 osc_add_shrink_grant(cli);
1111 /* We assume that the reason this OSC got a short read is because it read
1112 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1113 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1114 * this stripe never got written at or beyond this stripe offset yet. */
1115 static void handle_short_read(int nob_read, obd_count page_count,
1116 struct brw_page **pga)
1121 /* skip bytes read OK */
1122 while (nob_read > 0) {
1123 LASSERT (page_count > 0);
1125 if (pga[i]->count > nob_read) {
1126 /* EOF inside this page */
1127 ptr = cfs_kmap(pga[i]->pg) +
1128 (pga[i]->off & ~CFS_PAGE_MASK);
1129 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1130 cfs_kunmap(pga[i]->pg);
1136 nob_read -= pga[i]->count;
1141 /* zero remaining pages */
1142 while (page_count-- > 0) {
1143 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1144 memset(ptr, 0, pga[i]->count);
1145 cfs_kunmap(pga[i]->pg);
1150 static int check_write_rcs(struct ptlrpc_request *req,
1151 int requested_nob, int niocount,
1152 obd_count page_count, struct brw_page **pga)
1157 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1158 sizeof(*remote_rcs) *
1160 if (remote_rcs == NULL) {
1161 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1165 /* return error if any niobuf was in error */
1166 for (i = 0; i < niocount; i++) {
1167 if (remote_rcs[i] < 0)
1168 return(remote_rcs[i]);
1170 if (remote_rcs[i] != 0) {
1171 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1172 i, remote_rcs[i], req);
1177 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1178 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1179 req->rq_bulk->bd_nob_transferred, requested_nob);
1186 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1188 if (p1->flag != p2->flag) {
1189 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1190 OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1192 /* warn if we try to combine flags that we don't know to be
1193 * safe to combine */
1194 if ((p1->flag & mask) != (p2->flag & mask))
1195 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1196 "same brw?\n", p1->flag, p2->flag);
1200 return (p1->off + p1->count == p2->off);
1203 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1204 struct brw_page **pga, int opc,
1205 cksum_type_t cksum_type)
1210 LASSERT (pg_count > 0);
1211 cksum = init_checksum(cksum_type);
1212 while (nob > 0 && pg_count > 0) {
1213 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1214 int off = pga[i]->off & ~CFS_PAGE_MASK;
1215 int count = pga[i]->count > nob ? nob : pga[i]->count;
1217 /* corrupt the data before we compute the checksum, to
1218 * simulate an OST->client data error */
1219 if (i == 0 && opc == OST_READ &&
1220 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1221 memcpy(ptr + off, "bad1", min(4, nob));
1222 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1223 cfs_kunmap(pga[i]->pg);
1224 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1227 nob -= pga[i]->count;
1231 /* For sending we only compute the wrong checksum instead
1232 * of corrupting the data so it is still correct on a redo */
1233 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1239 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1240 struct lov_stripe_md *lsm, obd_count page_count,
1241 struct brw_page **pga,
1242 struct ptlrpc_request **reqp,
1243 struct obd_capa *ocapa, int reserve)
1245 struct ptlrpc_request *req;
1246 struct ptlrpc_bulk_desc *desc;
1247 struct ost_body *body;
1248 struct obd_ioobj *ioobj;
1249 struct niobuf_remote *niobuf;
1250 int niocount, i, requested_nob, opc, rc;
1251 struct osc_brw_async_args *aa;
1252 struct req_capsule *pill;
1253 struct brw_page *pg_prev;
1256 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1257 RETURN(-ENOMEM); /* Recoverable */
1258 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1259 RETURN(-EINVAL); /* Fatal */
1261 if ((cmd & OBD_BRW_WRITE) != 0) {
1263 req = ptlrpc_request_alloc_pool(cli->cl_import,
1264 cli->cl_import->imp_rq_pool,
1265 &RQF_OST_BRW_WRITE);
1268 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1273 for (niocount = i = 1; i < page_count; i++) {
1274 if (!can_merge_pages(pga[i - 1], pga[i]))
1278 pill = &req->rq_pill;
1279 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1281 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1282 niocount * sizeof(*niobuf));
1283 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1285 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1287 ptlrpc_request_free(req);
1290 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1291 ptlrpc_at_set_req_timeout(req);
1293 if (opc == OST_WRITE)
1294 desc = ptlrpc_prep_bulk_imp(req, page_count,
1295 BULK_GET_SOURCE, OST_BULK_PORTAL);
1297 desc = ptlrpc_prep_bulk_imp(req, page_count,
1298 BULK_PUT_SINK, OST_BULK_PORTAL);
1301 GOTO(out, rc = -ENOMEM);
1302 /* NB request now owns desc and will free it when it gets freed */
1304 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1305 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1306 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1307 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1309 lustre_set_wire_obdo(&body->oa, oa);
1311 obdo_to_ioobj(oa, ioobj);
1312 ioobj->ioo_bufcnt = niocount;
1313 osc_pack_capa(req, body, ocapa);
1314 LASSERT (page_count > 0);
1316 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1317 struct brw_page *pg = pga[i];
1319 LASSERT(pg->count > 0);
1320 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1321 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1322 pg->off, pg->count);
1324 LASSERTF(i == 0 || pg->off > pg_prev->off,
1325 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1326 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1328 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1329 pg_prev->pg, page_private(pg_prev->pg),
1330 pg_prev->pg->index, pg_prev->off);
1332 LASSERTF(i == 0 || pg->off > pg_prev->off,
1333 "i %d p_c %u\n", i, page_count);
1335 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1336 (pg->flag & OBD_BRW_SRVLOCK));
1338 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1340 requested_nob += pg->count;
1342 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1344 niobuf->len += pg->count;
1346 niobuf->offset = pg->off;
1347 niobuf->len = pg->count;
1348 niobuf->flags = pg->flag;
1353 LASSERTF((void *)(niobuf - niocount) ==
1354 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1355 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1356 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1358 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1359 if (osc_should_shrink_grant(cli))
1360 osc_shrink_grant_local(cli, &body->oa);
1362 /* size[REQ_REC_OFF] still sizeof (*body) */
1363 if (opc == OST_WRITE) {
1364 if (unlikely(cli->cl_checksum) &&
1365 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1366 /* store cl_cksum_type in a local variable since
1367 * it can be changed via lprocfs */
1368 cksum_type_t cksum_type = cli->cl_cksum_type;
1370 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371 oa->o_flags &= OBD_FL_LOCAL_MASK;
1372 body->oa.o_flags = 0;
1374 body->oa.o_flags |= cksum_type_pack(cksum_type);
1375 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1380 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1382 /* save this in 'oa', too, for later checking */
1383 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1384 oa->o_flags |= cksum_type_pack(cksum_type);
1386 /* clear out the checksum flag, in case this is a
1387 * resend but cl_checksum is no longer set. b=11238 */
1388 oa->o_valid &= ~OBD_MD_FLCKSUM;
1390 oa->o_cksum = body->oa.o_cksum;
1391 /* 1 RC per niobuf */
1392 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1393 sizeof(__u32) * niocount);
1395 if (unlikely(cli->cl_checksum) &&
1396 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1398 body->oa.o_flags = 0;
1399 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1400 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403 ptlrpc_request_set_replen(req);
1405 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1406 aa = ptlrpc_req_async_args(req);
1408 aa->aa_requested_nob = requested_nob;
1409 aa->aa_nio_count = niocount;
1410 aa->aa_page_count = page_count;
1414 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1415 if (ocapa && reserve)
1416 aa->aa_ocapa = capa_get(ocapa);
1422 ptlrpc_req_finished(req);
1426 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1427 __u32 client_cksum, __u32 server_cksum, int nob,
1428 obd_count page_count, struct brw_page **pga,
1429 cksum_type_t client_cksum_type)
1433 cksum_type_t cksum_type;
1435 if (server_cksum == client_cksum) {
1436 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1440 /* If this is mmaped file - it can be changed at any time */
1441 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1444 if (oa->o_valid & OBD_MD_FLFLAGS)
1445 cksum_type = cksum_type_unpack(oa->o_flags);
1447 cksum_type = OBD_CKSUM_CRC32;
1449 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1452 if (cksum_type != client_cksum_type)
1453 msg = "the server did not use the checksum type specified in "
1454 "the original request - likely a protocol problem";
1455 else if (new_cksum == server_cksum)
1456 msg = "changed on the client after we checksummed it - "
1457 "likely false positive due to mmap IO (bug 11742)";
1458 else if (new_cksum == client_cksum)
1459 msg = "changed in transit before arrival at OST";
1461 msg = "changed in transit AND doesn't match the original - "
1462 "likely false positive due to mmap IO (bug 11742)";
1464 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1465 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1466 msg, libcfs_nid2str(peer->nid),
1467 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1468 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1469 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1471 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1473 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1474 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1475 "client csum now %x\n", client_cksum, client_cksum_type,
1476 server_cksum, cksum_type, new_cksum);
1480 /* Note rc enters this function as number of bytes transferred */
1481 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1483 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1484 const lnet_process_id_t *peer =
1485 &req->rq_import->imp_connection->c_peer;
1486 struct client_obd *cli = aa->aa_cli;
1487 struct ost_body *body;
1488 __u32 client_cksum = 0;
1491 if (rc < 0 && rc != -EDQUOT) {
1492 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1496 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1497 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1499 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1503 #ifdef HAVE_QUOTA_SUPPORT
1504 /* set/clear over quota flag for a uid/gid */
1505 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1506 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1507 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1509 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1510 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1512 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1517 osc_update_grant(cli, body);
1522 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1523 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1525 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1527 CERROR("Unexpected +ve rc %d\n", rc);
1530 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1532 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1535 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1536 check_write_checksum(&body->oa, peer, client_cksum,
1537 body->oa.o_cksum, aa->aa_requested_nob,
1538 aa->aa_page_count, aa->aa_ppga,
1539 cksum_type_unpack(aa->aa_oa->o_flags)))
1542 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1543 aa->aa_page_count, aa->aa_ppga);
1547 /* The rest of this function executes only for OST_READs */
1549 /* if unwrap_bulk failed, return -EAGAIN to retry */
1550 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1552 GOTO(out, rc = -EAGAIN);
1554 if (rc > aa->aa_requested_nob) {
1555 CERROR("Unexpected rc %d (%d requested)\n", rc,
1556 aa->aa_requested_nob);
1560 if (rc != req->rq_bulk->bd_nob_transferred) {
1561 CERROR ("Unexpected rc %d (%d transferred)\n",
1562 rc, req->rq_bulk->bd_nob_transferred);
1566 if (rc < aa->aa_requested_nob)
1567 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1569 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1570 static int cksum_counter;
1571 __u32 server_cksum = body->oa.o_cksum;
1574 cksum_type_t cksum_type;
1576 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1577 cksum_type = cksum_type_unpack(body->oa.o_flags);
1579 cksum_type = OBD_CKSUM_CRC32;
1580 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1581 aa->aa_ppga, OST_READ,
1584 if (peer->nid == req->rq_bulk->bd_sender) {
1588 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1591 if (server_cksum == ~0 && rc > 0) {
1592 CERROR("Protocol error: server %s set the 'checksum' "
1593 "bit, but didn't send a checksum. Not fatal, "
1594 "but please notify on http://bugzilla.lustre.org/\n",
1595 libcfs_nid2str(peer->nid));
1596 } else if (server_cksum != client_cksum) {
1597 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1598 "%s%s%s inode "DFID" object "
1599 LPU64"/"LPU64" extent "
1600 "["LPU64"-"LPU64"]\n",
1601 req->rq_import->imp_obd->obd_name,
1602 libcfs_nid2str(peer->nid),
1604 body->oa.o_valid & OBD_MD_FLFID ?
1605 body->oa.o_parent_seq : (__u64)0,
1606 body->oa.o_valid & OBD_MD_FLFID ?
1607 body->oa.o_parent_oid : 0,
1608 body->oa.o_valid & OBD_MD_FLFID ?
1609 body->oa.o_parent_ver : 0,
1611 body->oa.o_valid & OBD_MD_FLGROUP ?
1612 body->oa.o_seq : (__u64)0,
1613 aa->aa_ppga[0]->off,
1614 aa->aa_ppga[aa->aa_page_count-1]->off +
1615 aa->aa_ppga[aa->aa_page_count-1]->count -
1617 CERROR("client %x, server %x, cksum_type %x\n",
1618 client_cksum, server_cksum, cksum_type);
1620 aa->aa_oa->o_cksum = client_cksum;
1624 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1627 } else if (unlikely(client_cksum)) {
1628 static int cksum_missed;
1631 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1632 CERROR("Checksum %u requested from %s but not sent\n",
1633 cksum_missed, libcfs_nid2str(peer->nid));
1639 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1644 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1645 struct lov_stripe_md *lsm,
1646 obd_count page_count, struct brw_page **pga,
1647 struct obd_capa *ocapa)
1649 struct ptlrpc_request *req;
1653 struct l_wait_info lwi;
1657 cfs_waitq_init(&waitq);
1660 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1661 page_count, pga, &req, ocapa, 0);
1665 rc = ptlrpc_queue_wait(req);
1667 if (rc == -ETIMEDOUT && req->rq_resend) {
1668 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1669 ptlrpc_req_finished(req);
1673 rc = osc_brw_fini_request(req, rc);
1675 ptlrpc_req_finished(req);
1676 if (osc_recoverable_error(rc)) {
1678 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1679 CERROR("too many resend retries, returning error\n");
1683 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1684 l_wait_event(waitq, 0, &lwi);
1692 int osc_brw_redo_request(struct ptlrpc_request *request,
1693 struct osc_brw_async_args *aa)
1695 struct ptlrpc_request *new_req;
1696 struct ptlrpc_request_set *set = request->rq_set;
1697 struct osc_brw_async_args *new_aa;
1698 struct osc_async_page *oap;
1702 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1703 CERROR("too many resent retries, returning error\n");
1707 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1709 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1710 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1711 aa->aa_cli, aa->aa_oa,
1712 NULL /* lsm unused by osc currently */,
1713 aa->aa_page_count, aa->aa_ppga,
1714 &new_req, aa->aa_ocapa, 0);
1718 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1720 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1721 if (oap->oap_request != NULL) {
1722 LASSERTF(request == oap->oap_request,
1723 "request %p != oap_request %p\n",
1724 request, oap->oap_request);
1725 if (oap->oap_interrupted) {
1726 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1727 ptlrpc_req_finished(new_req);
1732 /* New request takes over pga and oaps from old request.
1733 * Note that copying a list_head doesn't work, need to move it... */
1735 new_req->rq_interpret_reply = request->rq_interpret_reply;
1736 new_req->rq_async_args = request->rq_async_args;
1737 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1739 new_aa = ptlrpc_req_async_args(new_req);
1741 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1742 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1743 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1745 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1746 if (oap->oap_request) {
1747 ptlrpc_req_finished(oap->oap_request);
1748 oap->oap_request = ptlrpc_request_addref(new_req);
1752 new_aa->aa_ocapa = aa->aa_ocapa;
1753 aa->aa_ocapa = NULL;
1755 /* use ptlrpc_set_add_req is safe because interpret functions work
1756 * in check_set context. only one way exist with access to request
1757 * from different thread got -EINTR - this way protected with
1758 * cl_loi_list_lock */
1759 ptlrpc_set_add_req(set, new_req);
1761 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1763 DEBUG_REQ(D_INFO, new_req, "new request");
1768 * ugh, we want disk allocation on the target to happen in offset order. we'll
1769 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1770 * fine for our small page arrays and doesn't require allocation. its an
1771 * insertion sort that swaps elements that are strides apart, shrinking the
1772 * stride down until its '1' and the array is sorted.
1774 static void sort_brw_pages(struct brw_page **array, int num)
1777 struct brw_page *tmp;
1781 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1786 for (i = stride ; i < num ; i++) {
1789 while (j >= stride && array[j - stride]->off > tmp->off) {
1790 array[j] = array[j - stride];
1795 } while (stride > 1);
1798 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1804 LASSERT (pages > 0);
1805 offset = pg[i]->off & ~CFS_PAGE_MASK;
1809 if (pages == 0) /* that's all */
1812 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1813 return count; /* doesn't end on page boundary */
1816 offset = pg[i]->off & ~CFS_PAGE_MASK;
1817 if (offset != 0) /* doesn't start on page boundary */
1824 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1826 struct brw_page **ppga;
1829 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1833 for (i = 0; i < count; i++)
1838 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1840 LASSERT(ppga != NULL);
1841 OBD_FREE(ppga, sizeof(*ppga) * count);
1844 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1845 obd_count page_count, struct brw_page *pga,
1846 struct obd_trans_info *oti)
1848 struct obdo *saved_oa = NULL;
1849 struct brw_page **ppga, **orig;
1850 struct obd_import *imp = class_exp2cliimp(exp);
1851 struct client_obd *cli;
1852 int rc, page_count_orig;
1855 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1856 cli = &imp->imp_obd->u.cli;
1858 if (cmd & OBD_BRW_CHECK) {
1859 /* The caller just wants to know if there's a chance that this
1860 * I/O can succeed */
1862 if (imp->imp_invalid)
1867 /* test_brw with a failed create can trip this, maybe others. */
1868 LASSERT(cli->cl_max_pages_per_rpc);
1872 orig = ppga = osc_build_ppga(pga, page_count);
1875 page_count_orig = page_count;
1877 sort_brw_pages(ppga, page_count);
1878 while (page_count) {
1879 obd_count pages_per_brw;
1881 if (page_count > cli->cl_max_pages_per_rpc)
1882 pages_per_brw = cli->cl_max_pages_per_rpc;
1884 pages_per_brw = page_count;
1886 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1888 if (saved_oa != NULL) {
1889 /* restore previously saved oa */
1890 *oinfo->oi_oa = *saved_oa;
1891 } else if (page_count > pages_per_brw) {
1892 /* save a copy of oa (brw will clobber it) */
1893 OBDO_ALLOC(saved_oa);
1894 if (saved_oa == NULL)
1895 GOTO(out, rc = -ENOMEM);
1896 *saved_oa = *oinfo->oi_oa;
1899 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1900 pages_per_brw, ppga, oinfo->oi_capa);
1905 page_count -= pages_per_brw;
1906 ppga += pages_per_brw;
1910 osc_release_ppga(orig, page_count_orig);
1912 if (saved_oa != NULL)
1913 OBDO_FREE(saved_oa);
1918 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1919 * the dirty accounting. Writeback completes or truncate happens before
1920 * writing starts. Must be called with the loi lock held. */
1921 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1924 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1928 /* This maintains the lists of pending pages to read/write for a given object
1929 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1930 * to quickly find objects that are ready to send an RPC. */
1931 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1937 if (lop->lop_num_pending == 0)
1940 /* if we have an invalid import we want to drain the queued pages
1941 * by forcing them through rpcs that immediately fail and complete
1942 * the pages. recovery relies on this to empty the queued pages
1943 * before canceling the locks and evicting down the llite pages */
1944 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1947 /* stream rpcs in queue order as long as as there is an urgent page
1948 * queued. this is our cheap solution for good batching in the case
1949 * where writepage marks some random page in the middle of the file
1950 * as urgent because of, say, memory pressure */
1951 if (!cfs_list_empty(&lop->lop_urgent)) {
1952 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1955 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1956 optimal = cli->cl_max_pages_per_rpc;
1957 if (cmd & OBD_BRW_WRITE) {
1958 /* trigger a write rpc stream as long as there are dirtiers
1959 * waiting for space. as they're waiting, they're not going to
1960 * create more pages to coalesce with what's waiting.. */
1961 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1962 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1965 /* +16 to avoid triggering rpcs that would want to include pages
1966 * that are being queued but which can't be made ready until
1967 * the queuer finishes with the page. this is a wart for
1968 * llite::commit_write() */
1971 if (lop->lop_num_pending >= optimal)
1977 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1979 struct osc_async_page *oap;
1982 if (cfs_list_empty(&lop->lop_urgent))
1985 oap = cfs_list_entry(lop->lop_urgent.next,
1986 struct osc_async_page, oap_urgent_item);
1988 if (oap->oap_async_flags & ASYNC_HP) {
1989 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1996 static void on_list(cfs_list_t *item, cfs_list_t *list,
1999 if (cfs_list_empty(item) && should_be_on)
2000 cfs_list_add_tail(item, list);
2001 else if (!cfs_list_empty(item) && !should_be_on)
2002 cfs_list_del_init(item);
2005 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2006 * can find pages to build into rpcs quickly */
2007 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2009 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2010 lop_makes_hprpc(&loi->loi_read_lop)) {
2012 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2013 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2015 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2016 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2017 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2018 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2021 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2022 loi->loi_write_lop.lop_num_pending);
2024 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2025 loi->loi_read_lop.lop_num_pending);
2028 static void lop_update_pending(struct client_obd *cli,
2029 struct loi_oap_pages *lop, int cmd, int delta)
2031 lop->lop_num_pending += delta;
2032 if (cmd & OBD_BRW_WRITE)
2033 cli->cl_pending_w_pages += delta;
2035 cli->cl_pending_r_pages += delta;
2039 * this is called when a sync waiter receives an interruption. Its job is to
2040 * get the caller woken as soon as possible. If its page hasn't been put in an
2041 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2042 * desiring interruption which will forcefully complete the rpc once the rpc
2045 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2047 struct loi_oap_pages *lop;
2048 struct lov_oinfo *loi;
2052 LASSERT(!oap->oap_interrupted);
2053 oap->oap_interrupted = 1;
2055 /* ok, it's been put in an rpc. only one oap gets a request reference */
2056 if (oap->oap_request != NULL) {
2057 ptlrpc_mark_interrupted(oap->oap_request);
2058 ptlrpcd_wake(oap->oap_request);
2059 ptlrpc_req_finished(oap->oap_request);
2060 oap->oap_request = NULL;
2064 * page completion may be called only if ->cpo_prep() method was
2065 * executed by osc_io_submit(), that also adds page the to pending list
2067 if (!cfs_list_empty(&oap->oap_pending_item)) {
2068 cfs_list_del_init(&oap->oap_pending_item);
2069 cfs_list_del_init(&oap->oap_urgent_item);
2072 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2073 &loi->loi_write_lop : &loi->loi_read_lop;
2074 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2075 loi_list_maint(oap->oap_cli, oap->oap_loi);
2076 rc = oap->oap_caller_ops->ap_completion(env,
2077 oap->oap_caller_data,
2078 oap->oap_cmd, NULL, -EINTR);
2084 /* this is trying to propogate async writeback errors back up to the
2085 * application. As an async write fails we record the error code for later if
2086 * the app does an fsync. As long as errors persist we force future rpcs to be
2087 * sync so that the app can get a sync error and break the cycle of queueing
2088 * pages for which writeback will fail. */
2089 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2096 ar->ar_force_sync = 1;
2097 ar->ar_min_xid = ptlrpc_sample_next_xid();
2102 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2103 ar->ar_force_sync = 0;
2106 void osc_oap_to_pending(struct osc_async_page *oap)
2108 struct loi_oap_pages *lop;
2110 if (oap->oap_cmd & OBD_BRW_WRITE)
2111 lop = &oap->oap_loi->loi_write_lop;
2113 lop = &oap->oap_loi->loi_read_lop;
2115 if (oap->oap_async_flags & ASYNC_HP)
2116 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2117 else if (oap->oap_async_flags & ASYNC_URGENT)
2118 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2119 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2120 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2123 /* this must be called holding the loi list lock to give coverage to exit_cache,
2124 * async_flag maintenance, and oap_request */
2125 static void osc_ap_completion(const struct lu_env *env,
2126 struct client_obd *cli, struct obdo *oa,
2127 struct osc_async_page *oap, int sent, int rc)
2132 if (oap->oap_request != NULL) {
2133 xid = ptlrpc_req_xid(oap->oap_request);
2134 ptlrpc_req_finished(oap->oap_request);
2135 oap->oap_request = NULL;
2138 cfs_spin_lock(&oap->oap_lock);
2139 oap->oap_async_flags = 0;
2140 cfs_spin_unlock(&oap->oap_lock);
2141 oap->oap_interrupted = 0;
2143 if (oap->oap_cmd & OBD_BRW_WRITE) {
2144 osc_process_ar(&cli->cl_ar, xid, rc);
2145 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2148 if (rc == 0 && oa != NULL) {
2149 if (oa->o_valid & OBD_MD_FLBLOCKS)
2150 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2151 if (oa->o_valid & OBD_MD_FLMTIME)
2152 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2153 if (oa->o_valid & OBD_MD_FLATIME)
2154 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2155 if (oa->o_valid & OBD_MD_FLCTIME)
2156 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2159 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2160 oap->oap_cmd, oa, rc);
2162 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2163 * I/O on the page could start, but OSC calls it under lock
2164 * and thus we can add oap back to pending safely */
2166 /* upper layer wants to leave the page on pending queue */
2167 osc_oap_to_pending(oap);
2169 osc_exit_cache(cli, oap, sent);
2173 static int brw_interpret(const struct lu_env *env,
2174 struct ptlrpc_request *req, void *data, int rc)
2176 struct osc_brw_async_args *aa = data;
2177 struct client_obd *cli;
2181 rc = osc_brw_fini_request(req, rc);
2182 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2183 if (osc_recoverable_error(rc)) {
2184 /* Only retry once for mmaped files since the mmaped page
2185 * might be modified at anytime. We have to retry at least
2186 * once in case there WAS really a corruption of the page
2187 * on the network, that was not caused by mmap() modifying
2188 * the page. Bug11742 */
2189 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2190 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2191 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2194 rc = osc_brw_redo_request(req, aa);
2201 capa_put(aa->aa_ocapa);
2202 aa->aa_ocapa = NULL;
2207 client_obd_list_lock(&cli->cl_loi_list_lock);
2209 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2210 * is called so we know whether to go to sync BRWs or wait for more
2211 * RPCs to complete */
2212 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2213 cli->cl_w_in_flight--;
2215 cli->cl_r_in_flight--;
2217 async = cfs_list_empty(&aa->aa_oaps);
2218 if (!async) { /* from osc_send_oap_rpc() */
2219 struct osc_async_page *oap, *tmp;
2220 /* the caller may re-use the oap after the completion call so
2221 * we need to clean it up a little */
2222 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2224 cfs_list_del_init(&oap->oap_rpc_item);
2225 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2227 OBDO_FREE(aa->aa_oa);
2228 } else { /* from async_internal() */
2230 for (i = 0; i < aa->aa_page_count; i++)
2231 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2233 osc_wake_cache_waiters(cli);
2234 osc_check_rpcs(env, cli);
2235 client_obd_list_unlock(&cli->cl_loi_list_lock);
2237 cl_req_completion(env, aa->aa_clerq, rc);
2238 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2243 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2244 struct client_obd *cli,
2245 cfs_list_t *rpc_list,
2246 int page_count, int cmd)
2248 struct ptlrpc_request *req;
2249 struct brw_page **pga = NULL;
2250 struct osc_brw_async_args *aa;
2251 struct obdo *oa = NULL;
2252 const struct obd_async_page_ops *ops = NULL;
2253 void *caller_data = NULL;
2254 struct osc_async_page *oap;
2255 struct osc_async_page *tmp;
2256 struct ost_body *body;
2257 struct cl_req *clerq = NULL;
2258 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2259 struct ldlm_lock *lock = NULL;
2260 struct cl_req_attr crattr;
2261 int i, rc, mpflag = 0;
2264 LASSERT(!cfs_list_empty(rpc_list));
2266 if (cmd & OBD_BRW_MEMALLOC)
2267 mpflag = cfs_memory_pressure_get_and_set();
2269 memset(&crattr, 0, sizeof crattr);
2270 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2272 GOTO(out, req = ERR_PTR(-ENOMEM));
2276 GOTO(out, req = ERR_PTR(-ENOMEM));
2279 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2280 struct cl_page *page = osc_oap2cl_page(oap);
2282 ops = oap->oap_caller_ops;
2283 caller_data = oap->oap_caller_data;
2285 clerq = cl_req_alloc(env, page, crt,
2286 1 /* only 1-object rpcs for
2289 GOTO(out, req = (void *)clerq);
2290 lock = oap->oap_ldlm_lock;
2292 pga[i] = &oap->oap_brw_page;
2293 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2294 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2295 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2297 cl_req_page_add(env, clerq, page);
2300 /* always get the data for the obdo for the rpc */
2301 LASSERT(ops != NULL);
2303 crattr.cra_capa = NULL;
2304 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2306 oa->o_handle = lock->l_remote_handle;
2307 oa->o_valid |= OBD_MD_FLHANDLE;
2310 rc = cl_req_prep(env, clerq);
2312 CERROR("cl_req_prep failed: %d\n", rc);
2313 GOTO(out, req = ERR_PTR(rc));
2316 sort_brw_pages(pga, page_count);
2317 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2318 pga, &req, crattr.cra_capa, 1);
2320 CERROR("prep_req failed: %d\n", rc);
2321 GOTO(out, req = ERR_PTR(rc));
2324 if (cmd & OBD_BRW_MEMALLOC)
2325 req->rq_memalloc = 1;
2327 /* Need to update the timestamps after the request is built in case
2328 * we race with setattr (locally or in queue at OST). If OST gets
2329 * later setattr before earlier BRW (as determined by the request xid),
2330 * the OST will not use BRW timestamps. Sadly, there is no obvious
2331 * way to do this in a single call. bug 10150 */
2332 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2333 cl_req_attr_set(env, clerq, &crattr,
2334 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2336 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2337 aa = ptlrpc_req_async_args(req);
2338 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2339 cfs_list_splice(rpc_list, &aa->aa_oaps);
2340 CFS_INIT_LIST_HEAD(rpc_list);
2341 aa->aa_clerq = clerq;
2343 if (cmd & OBD_BRW_MEMALLOC)
2344 cfs_memory_pressure_restore(mpflag);
2346 capa_put(crattr.cra_capa);
2351 OBD_FREE(pga, sizeof(*pga) * page_count);
2352 /* this should happen rarely and is pretty bad, it makes the
2353 * pending list not follow the dirty order */
2354 client_obd_list_lock(&cli->cl_loi_list_lock);
2355 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2356 cfs_list_del_init(&oap->oap_rpc_item);
2358 /* queued sync pages can be torn down while the pages
2359 * were between the pending list and the rpc */
2360 if (oap->oap_interrupted) {
2361 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2362 osc_ap_completion(env, cli, NULL, oap, 0,
2366 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2368 if (clerq && !IS_ERR(clerq))
2369 cl_req_completion(env, clerq, PTR_ERR(req));
2375 * prepare pages for ASYNC io and put pages in send queue.
2377 * \param cmd OBD_BRW_* macroses
2378 * \param lop pending pages
2380 * \return zero if no page added to send queue.
2381 * \return 1 if pages successfully added to send queue.
2382 * \return negative on errors.
2385 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2386 struct lov_oinfo *loi,
2387 int cmd, struct loi_oap_pages *lop)
2389 struct ptlrpc_request *req;
2390 obd_count page_count = 0;
2391 struct osc_async_page *oap = NULL, *tmp;
2392 struct osc_brw_async_args *aa;
2393 const struct obd_async_page_ops *ops;
2394 CFS_LIST_HEAD(rpc_list);
2395 CFS_LIST_HEAD(tmp_list);
2396 unsigned int ending_offset;
2397 unsigned starting_offset = 0;
2398 int srvlock = 0, mem_tight = 0;
2399 struct cl_object *clob = NULL;
2402 /* ASYNC_HP pages first. At present, when the lock the pages is
2403 * to be canceled, the pages covered by the lock will be sent out
2404 * with ASYNC_HP. We have to send out them as soon as possible. */
2405 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2406 if (oap->oap_async_flags & ASYNC_HP)
2407 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2409 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2410 if (++page_count >= cli->cl_max_pages_per_rpc)
2414 cfs_list_splice(&tmp_list, &lop->lop_pending);
2417 /* first we find the pages we're allowed to work with */
2418 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2420 ops = oap->oap_caller_ops;
2422 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2423 "magic 0x%x\n", oap, oap->oap_magic);
2426 /* pin object in memory, so that completion call-backs
2427 * can be safely called under client_obd_list lock. */
2428 clob = osc_oap2cl_page(oap)->cp_obj;
2429 cl_object_get(clob);
2432 if (page_count != 0 &&
2433 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2434 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2435 " oap %p, page %p, srvlock %u\n",
2436 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2440 /* If there is a gap at the start of this page, it can't merge
2441 * with any previous page, so we'll hand the network a
2442 * "fragmented" page array that it can't transfer in 1 RDMA */
2443 if (page_count != 0 && oap->oap_page_off != 0)
2446 /* in llite being 'ready' equates to the page being locked
2447 * until completion unlocks it. commit_write submits a page
2448 * as not ready because its unlock will happen unconditionally
2449 * as the call returns. if we race with commit_write giving
2450 * us that page we don't want to create a hole in the page
2451 * stream, so we stop and leave the rpc to be fired by
2452 * another dirtier or kupdated interval (the not ready page
2453 * will still be on the dirty list). we could call in
2454 * at the end of ll_file_write to process the queue again. */
2455 if (!(oap->oap_async_flags & ASYNC_READY)) {
2456 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2459 CDEBUG(D_INODE, "oap %p page %p returned %d "
2460 "instead of ready\n", oap,
2464 /* llite is telling us that the page is still
2465 * in commit_write and that we should try
2466 * and put it in an rpc again later. we
2467 * break out of the loop so we don't create
2468 * a hole in the sequence of pages in the rpc
2473 /* the io isn't needed.. tell the checks
2474 * below to complete the rpc with EINTR */
2475 cfs_spin_lock(&oap->oap_lock);
2476 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2477 cfs_spin_unlock(&oap->oap_lock);
2478 oap->oap_count = -EINTR;
2481 cfs_spin_lock(&oap->oap_lock);
2482 oap->oap_async_flags |= ASYNC_READY;
2483 cfs_spin_unlock(&oap->oap_lock);
2486 LASSERTF(0, "oap %p page %p returned %d "
2487 "from make_ready\n", oap,
2495 * Page submitted for IO has to be locked. Either by
2496 * ->ap_make_ready() or by higher layers.
2498 #if defined(__KERNEL__) && defined(__linux__)
2500 struct cl_page *page;
2502 page = osc_oap2cl_page(oap);
2504 if (page->cp_type == CPT_CACHEABLE &&
2505 !(PageLocked(oap->oap_page) &&
2506 (CheckWriteback(oap->oap_page, cmd)))) {
2507 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2509 (long)oap->oap_page->flags,
2510 oap->oap_async_flags);
2516 /* take the page out of our book-keeping */
2517 cfs_list_del_init(&oap->oap_pending_item);
2518 lop_update_pending(cli, lop, cmd, -1);
2519 cfs_list_del_init(&oap->oap_urgent_item);
2521 if (page_count == 0)
2522 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2523 (PTLRPC_MAX_BRW_SIZE - 1);
2525 /* ask the caller for the size of the io as the rpc leaves. */
2526 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2528 ops->ap_refresh_count(env, oap->oap_caller_data,
2530 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2532 if (oap->oap_count <= 0) {
2533 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2535 osc_ap_completion(env, cli, NULL,
2536 oap, 0, oap->oap_count);
2540 /* now put the page back in our accounting */
2541 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2542 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2544 if (page_count == 0)
2545 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2546 if (++page_count >= cli->cl_max_pages_per_rpc)
2549 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2550 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2551 * have the same alignment as the initial writes that allocated
2552 * extents on the server. */
2553 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2554 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2555 if (ending_offset == 0)
2558 /* If there is a gap at the end of this page, it can't merge
2559 * with any subsequent pages, so we'll hand the network a
2560 * "fragmented" page array that it can't transfer in 1 RDMA */
2561 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2565 osc_wake_cache_waiters(cli);
2567 loi_list_maint(cli, loi);
2569 client_obd_list_unlock(&cli->cl_loi_list_lock);
2572 cl_object_put(env, clob);
2574 if (page_count == 0) {
2575 client_obd_list_lock(&cli->cl_loi_list_lock);
2579 req = osc_build_req(env, cli, &rpc_list, page_count,
2580 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2582 LASSERT(cfs_list_empty(&rpc_list));
2583 loi_list_maint(cli, loi);
2584 RETURN(PTR_ERR(req));
2587 aa = ptlrpc_req_async_args(req);
2589 if (cmd == OBD_BRW_READ) {
2590 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2591 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2592 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2593 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2595 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2596 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2597 cli->cl_w_in_flight);
2598 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2599 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2601 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2603 client_obd_list_lock(&cli->cl_loi_list_lock);
2605 if (cmd == OBD_BRW_READ)
2606 cli->cl_r_in_flight++;
2608 cli->cl_w_in_flight++;
2610 /* queued sync pages can be torn down while the pages
2611 * were between the pending list and the rpc */
2613 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2614 /* only one oap gets a request reference */
2617 if (oap->oap_interrupted && !req->rq_intr) {
2618 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2620 ptlrpc_mark_interrupted(req);
2624 tmp->oap_request = ptlrpc_request_addref(req);
2626 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2627 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2629 req->rq_interpret_reply = brw_interpret;
2630 ptlrpcd_add_req(req, PSCOPE_BRW);
2634 #define LOI_DEBUG(LOI, STR, args...) \
2635 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2636 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2637 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2638 (LOI)->loi_write_lop.lop_num_pending, \
2639 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2640 (LOI)->loi_read_lop.lop_num_pending, \
2641 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2644 /* This is called by osc_check_rpcs() to find which objects have pages that
2645 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2646 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2650 /* First return objects that have blocked locks so that they
2651 * will be flushed quickly and other clients can get the lock,
2652 * then objects which have pages ready to be stuffed into RPCs */
2653 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2654 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2655 struct lov_oinfo, loi_hp_ready_item));
2656 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2657 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2658 struct lov_oinfo, loi_ready_item));
2660 /* then if we have cache waiters, return all objects with queued
2661 * writes. This is especially important when many small files
2662 * have filled up the cache and not been fired into rpcs because
2663 * they don't pass the nr_pending/object threshhold */
2664 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2665 !cfs_list_empty(&cli->cl_loi_write_list))
2666 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2667 struct lov_oinfo, loi_write_item));
2669 /* then return all queued objects when we have an invalid import
2670 * so that they get flushed */
2671 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2672 if (!cfs_list_empty(&cli->cl_loi_write_list))
2673 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2676 if (!cfs_list_empty(&cli->cl_loi_read_list))
2677 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2678 struct lov_oinfo, loi_read_item));
2683 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2685 struct osc_async_page *oap;
2688 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2689 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2690 struct osc_async_page, oap_urgent_item);
2691 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2694 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2695 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2696 struct osc_async_page, oap_urgent_item);
2697 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2700 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2703 /* called with the loi list lock held */
2704 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2706 struct lov_oinfo *loi;
2707 int rc = 0, race_counter = 0;
2710 while ((loi = osc_next_loi(cli)) != NULL) {
2711 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2713 if (osc_max_rpc_in_flight(cli, loi))
2716 /* attempt some read/write balancing by alternating between
2717 * reads and writes in an object. The makes_rpc checks here
2718 * would be redundant if we were getting read/write work items
2719 * instead of objects. we don't want send_oap_rpc to drain a
2720 * partial read pending queue when we're given this object to
2721 * do io on writes while there are cache waiters */
2722 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2723 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2724 &loi->loi_write_lop);
2726 CERROR("Write request failed with %d\n", rc);
2728 /* osc_send_oap_rpc failed, mostly because of
2731 * It can't break here, because if:
2732 * - a page was submitted by osc_io_submit, so
2734 * - no request in flight
2735 * - no subsequent request
2736 * The system will be in live-lock state,
2737 * because there is no chance to call
2738 * osc_io_unplug() and osc_check_rpcs() any
2739 * more. pdflush can't help in this case,
2740 * because it might be blocked at grabbing
2741 * the page lock as we mentioned.
2743 * Anyway, continue to drain pages. */
2752 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2753 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2754 &loi->loi_read_lop);
2756 CERROR("Read request failed with %d\n", rc);
2764 /* attempt some inter-object balancing by issuing rpcs
2765 * for each object in turn */
2766 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2767 cfs_list_del_init(&loi->loi_hp_ready_item);
2768 if (!cfs_list_empty(&loi->loi_ready_item))
2769 cfs_list_del_init(&loi->loi_ready_item);
2770 if (!cfs_list_empty(&loi->loi_write_item))
2771 cfs_list_del_init(&loi->loi_write_item);
2772 if (!cfs_list_empty(&loi->loi_read_item))
2773 cfs_list_del_init(&loi->loi_read_item);
2775 loi_list_maint(cli, loi);
2777 /* send_oap_rpc fails with 0 when make_ready tells it to
2778 * back off. llite's make_ready does this when it tries
2779 * to lock a page queued for write that is already locked.
2780 * we want to try sending rpcs from many objects, but we
2781 * don't want to spin failing with 0. */
2782 if (race_counter == 10)
2788 /* we're trying to queue a page in the osc so we're subject to the
2789 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2790 * If the osc's queued pages are already at that limit, then we want to sleep
2791 * until there is space in the osc's queue for us. We also may be waiting for
2792 * write credits from the OST if there are RPCs in flight that may return some
2793 * before we fall back to sync writes.
2795 * We need this know our allocation was granted in the presence of signals */
2796 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2800 client_obd_list_lock(&cli->cl_loi_list_lock);
2801 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2802 client_obd_list_unlock(&cli->cl_loi_list_lock);
2807 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2810 int osc_enter_cache_try(const struct lu_env *env,
2811 struct client_obd *cli, struct lov_oinfo *loi,
2812 struct osc_async_page *oap, int transient)
2816 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2818 osc_consume_write_grant(cli, &oap->oap_brw_page);
2820 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2821 cfs_atomic_inc(&obd_dirty_transit_pages);
2822 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2828 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2829 * grant or cache space. */
2830 static int osc_enter_cache(const struct lu_env *env,
2831 struct client_obd *cli, struct lov_oinfo *loi,
2832 struct osc_async_page *oap)
2834 struct osc_cache_waiter ocw;
2835 struct l_wait_info lwi = { 0 };
2839 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2840 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2841 cli->cl_dirty_max, obd_max_dirty_pages,
2842 cli->cl_lost_grant, cli->cl_avail_grant);
2844 /* force the caller to try sync io. this can jump the list
2845 * of queued writes and create a discontiguous rpc stream */
2846 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2847 loi->loi_ar.ar_force_sync)
2850 /* Hopefully normal case - cache space and write credits available */
2851 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2852 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2853 osc_enter_cache_try(env, cli, loi, oap, 0))
2856 /* It is safe to block as a cache waiter as long as there is grant
2857 * space available or the hope of additional grant being returned
2858 * when an in flight write completes. Using the write back cache
2859 * if possible is preferable to sending the data synchronously
2860 * because write pages can then be merged in to large requests.
2861 * The addition of this cache waiter will causing pending write
2862 * pages to be sent immediately. */
2863 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2864 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2865 cfs_waitq_init(&ocw.ocw_waitq);
2869 loi_list_maint(cli, loi);
2870 osc_check_rpcs(env, cli);
2871 client_obd_list_unlock(&cli->cl_loi_list_lock);
2873 CDEBUG(D_CACHE, "sleeping for cache space\n");
2874 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2876 client_obd_list_lock(&cli->cl_loi_list_lock);
2877 if (!cfs_list_empty(&ocw.ocw_entry)) {
2878 cfs_list_del(&ocw.ocw_entry);
2888 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2889 struct lov_oinfo *loi, cfs_page_t *page,
2890 obd_off offset, const struct obd_async_page_ops *ops,
2891 void *data, void **res, int nocache,
2892 struct lustre_handle *lockh)
2894 struct osc_async_page *oap;
2899 return cfs_size_round(sizeof(*oap));
2902 oap->oap_magic = OAP_MAGIC;
2903 oap->oap_cli = &exp->exp_obd->u.cli;
2906 oap->oap_caller_ops = ops;
2907 oap->oap_caller_data = data;
2909 oap->oap_page = page;
2910 oap->oap_obj_off = offset;
2911 if (!client_is_remote(exp) &&
2912 cfs_capable(CFS_CAP_SYS_RESOURCE))
2913 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2915 LASSERT(!(offset & ~CFS_PAGE_MASK));
2917 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2918 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2919 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2920 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2922 cfs_spin_lock_init(&oap->oap_lock);
2923 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2927 struct osc_async_page *oap_from_cookie(void *cookie)
2929 struct osc_async_page *oap = cookie;
2930 if (oap->oap_magic != OAP_MAGIC)
2931 return ERR_PTR(-EINVAL);
2935 int osc_queue_async_io(const struct lu_env *env,
2936 struct obd_export *exp, struct lov_stripe_md *lsm,
2937 struct lov_oinfo *loi, void *cookie,
2938 int cmd, obd_off off, int count,
2939 obd_flag brw_flags, enum async_flags async_flags)
2941 struct client_obd *cli = &exp->exp_obd->u.cli;
2942 struct osc_async_page *oap;
2946 oap = oap_from_cookie(cookie);
2948 RETURN(PTR_ERR(oap));
2950 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2953 if (!cfs_list_empty(&oap->oap_pending_item) ||
2954 !cfs_list_empty(&oap->oap_urgent_item) ||
2955 !cfs_list_empty(&oap->oap_rpc_item))
2958 /* check if the file's owner/group is over quota */
2959 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2960 struct cl_object *obj;
2961 struct cl_attr attr; /* XXX put attr into thread info */
2962 unsigned int qid[MAXQUOTAS];
2964 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2966 cl_object_attr_lock(obj);
2967 rc = cl_object_attr_get(env, obj, &attr);
2968 cl_object_attr_unlock(obj);
2970 qid[USRQUOTA] = attr.cat_uid;
2971 qid[GRPQUOTA] = attr.cat_gid;
2973 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2980 loi = lsm->lsm_oinfo[0];
2982 client_obd_list_lock(&cli->cl_loi_list_lock);
2984 LASSERT(off + count <= CFS_PAGE_SIZE);
2986 oap->oap_page_off = off;
2987 oap->oap_count = count;
2988 oap->oap_brw_flags = brw_flags;
2989 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2990 if (cfs_memory_pressure_get())
2991 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2992 cfs_spin_lock(&oap->oap_lock);
2993 oap->oap_async_flags = async_flags;
2994 cfs_spin_unlock(&oap->oap_lock);
2996 if (cmd & OBD_BRW_WRITE) {
2997 rc = osc_enter_cache(env, cli, loi, oap);
2999 client_obd_list_unlock(&cli->cl_loi_list_lock);
3004 osc_oap_to_pending(oap);
3005 loi_list_maint(cli, loi);
3007 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3010 osc_check_rpcs(env, cli);
3011 client_obd_list_unlock(&cli->cl_loi_list_lock);
3016 /* aka (~was & now & flag), but this is more clear :) */
3017 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3019 int osc_set_async_flags_base(struct client_obd *cli,
3020 struct lov_oinfo *loi, struct osc_async_page *oap,
3021 obd_flag async_flags)
3023 struct loi_oap_pages *lop;
3027 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3029 if (oap->oap_cmd & OBD_BRW_WRITE) {
3030 lop = &loi->loi_write_lop;
3032 lop = &loi->loi_read_lop;
3035 if ((oap->oap_async_flags & async_flags) == async_flags)
3038 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3039 flags |= ASYNC_READY;
3041 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3042 cfs_list_empty(&oap->oap_rpc_item)) {
3043 if (oap->oap_async_flags & ASYNC_HP)
3044 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3046 cfs_list_add_tail(&oap->oap_urgent_item,
3048 flags |= ASYNC_URGENT;
3049 loi_list_maint(cli, loi);
3051 cfs_spin_lock(&oap->oap_lock);
3052 oap->oap_async_flags |= flags;
3053 cfs_spin_unlock(&oap->oap_lock);
3055 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3056 oap->oap_async_flags);
3060 int osc_teardown_async_page(struct obd_export *exp,
3061 struct lov_stripe_md *lsm,
3062 struct lov_oinfo *loi, void *cookie)
3064 struct client_obd *cli = &exp->exp_obd->u.cli;
3065 struct loi_oap_pages *lop;
3066 struct osc_async_page *oap;
3070 oap = oap_from_cookie(cookie);
3072 RETURN(PTR_ERR(oap));
3075 loi = lsm->lsm_oinfo[0];
3077 if (oap->oap_cmd & OBD_BRW_WRITE) {
3078 lop = &loi->loi_write_lop;
3080 lop = &loi->loi_read_lop;
3083 client_obd_list_lock(&cli->cl_loi_list_lock);
3085 if (!cfs_list_empty(&oap->oap_rpc_item))
3086 GOTO(out, rc = -EBUSY);
3088 osc_exit_cache(cli, oap, 0);
3089 osc_wake_cache_waiters(cli);
3091 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3092 cfs_list_del_init(&oap->oap_urgent_item);
3093 cfs_spin_lock(&oap->oap_lock);
3094 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3095 cfs_spin_unlock(&oap->oap_lock);
3097 if (!cfs_list_empty(&oap->oap_pending_item)) {
3098 cfs_list_del_init(&oap->oap_pending_item);
3099 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3101 loi_list_maint(cli, loi);
3102 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3104 client_obd_list_unlock(&cli->cl_loi_list_lock);
3108 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3109 struct ldlm_enqueue_info *einfo,
3112 void *data = einfo->ei_cbdata;
3114 LASSERT(lock != NULL);
3115 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3116 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3117 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3118 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3120 lock_res_and_lock(lock);
3121 cfs_spin_lock(&osc_ast_guard);
3122 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3123 lock->l_ast_data = data;
3124 cfs_spin_unlock(&osc_ast_guard);
3125 unlock_res_and_lock(lock);
3128 static void osc_set_data_with_check(struct lustre_handle *lockh,
3129 struct ldlm_enqueue_info *einfo,
3132 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3135 osc_set_lock_data_with_check(lock, einfo, flags);
3136 LDLM_LOCK_PUT(lock);
3138 CERROR("lockh %p, data %p - client evicted?\n",
3139 lockh, einfo->ei_cbdata);
3142 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3143 ldlm_iterator_t replace, void *data)
3145 struct ldlm_res_id res_id;
3146 struct obd_device *obd = class_exp2obd(exp);
3148 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3149 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3153 /* find any ldlm lock of the inode in osc
3157 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3158 ldlm_iterator_t replace, void *data)
3160 struct ldlm_res_id res_id;
3161 struct obd_device *obd = class_exp2obd(exp);
3164 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3165 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3166 if (rc == LDLM_ITER_STOP)