1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 rc = sa->sa_upcall(sa->sa_cookie, rc);
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 obd_enqueue_update_f upcall, void *cookie,
370 struct ptlrpc_request_set *rqset)
372 struct ptlrpc_request *req;
373 struct osc_setattr_args *sa;
377 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 ptlrpc_request_free(req);
388 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 /* do mds to ost setattr asynchronously */
397 /* Do not wait for response. */
398 ptlrpcd_add_req(req, PSCOPE_OTHER);
400 req->rq_interpret_reply =
401 (ptlrpc_interpterer_t)osc_setattr_interpret;
403 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404 sa = ptlrpc_req_async_args(req);
405 sa->sa_oa = oinfo->oi_oa;
406 sa->sa_upcall = upcall;
407 sa->sa_cookie = cookie;
409 if (rqset == PTLRPCD_SET)
410 ptlrpcd_add_req(req, PSCOPE_OTHER);
412 ptlrpc_set_add_req(rqset, req);
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419 struct obd_trans_info *oti,
420 struct ptlrpc_request_set *rqset)
422 return osc_setattr_async_base(exp, oinfo, oti,
423 oinfo->oi_cb_up, oinfo, rqset);
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427 struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 struct ptlrpc_request *req;
430 struct ost_body *body;
431 struct lov_stripe_md *lsm;
440 rc = obd_alloc_memmd(exp, &lsm);
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447 GOTO(out, rc = -ENOMEM);
449 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451 ptlrpc_request_free(req);
455 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457 lustre_set_wire_obdo(&body->oa, oa);
459 ptlrpc_request_set_replen(req);
461 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462 oa->o_flags == OBD_FL_DELORPHAN) {
464 "delorphan from OST integration");
465 /* Don't resend the delorphan req */
466 req->rq_no_resend = req->rq_no_delay = 1;
469 rc = ptlrpc_queue_wait(req);
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 GOTO(out_req, rc = -EPROTO);
477 lustre_get_wire_obdo(oa, &body->oa);
479 /* This should really be sent by the OST */
480 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481 oa->o_valid |= OBD_MD_FLBLKSZ;
483 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484 * have valid lsm_oinfo data structs, so don't go touching that.
485 * This needs to be fixed in a big way.
487 lsm->lsm_object_id = oa->o_id;
488 lsm->lsm_object_seq = oa->o_seq;
492 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495 if (!oti->oti_logcookies)
496 oti_alloc_cookies(oti, 1);
497 *oti->oti_logcookies = oa->o_lcookie;
501 CDEBUG(D_HA, "transno: "LPD64"\n",
502 lustre_msg_get_transno(req->rq_repmsg));
504 ptlrpc_req_finished(req);
507 obd_free_memmd(exp, &lsm);
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512 obd_enqueue_update_f upcall, void *cookie,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_setattr_args *sa;
517 struct ost_body *body;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 ptlrpc_request_free(req);
531 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532 ptlrpc_at_set_req_timeout(req);
534 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537 osc_pack_capa(req, body, oinfo->oi_capa);
539 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PSCOPE_OTHER);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557 struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568 struct lov_stripe_md *md, obd_size start, obd_size end,
571 struct ptlrpc_request *req;
572 struct ost_body *body;
577 CDEBUG(D_INFO, "oa NULL\n");
581 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
585 osc_set_capa_size(req, &RMF_CAPA1, capa);
586 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588 ptlrpc_request_free(req);
592 /* overload the size and blocks fields in the oa with start/end */
593 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595 lustre_set_wire_obdo(&body->oa, oa);
596 body->oa.o_size = start;
597 body->oa.o_blocks = end;
598 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599 osc_pack_capa(req, body, capa);
601 ptlrpc_request_set_replen(req);
603 rc = ptlrpc_queue_wait(req);
607 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
609 GOTO(out, rc = -EPROTO);
611 lustre_get_wire_obdo(oa, &body->oa);
615 ptlrpc_req_finished(req);
619 /* Find and cancel locally locks matched by @mode in the resource found by
620 * @objid. Found locks are added into @cancel list. Returns the amount of
621 * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
624 ldlm_mode_t mode, int lock_flags)
626 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627 struct ldlm_res_id res_id;
628 struct ldlm_resource *res;
632 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
637 LDLM_RESOURCE_ADDREF(res);
638 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639 lock_flags, 0, NULL);
640 LDLM_RESOURCE_DELREF(res);
641 ldlm_resource_putref(res);
645 static int osc_destroy_interpret(const struct lu_env *env,
646 struct ptlrpc_request *req, void *data,
649 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
651 cfs_atomic_dec(&cli->cl_destroy_in_flight);
652 cfs_waitq_signal(&cli->cl_destroy_waitq);
656 static int osc_can_send_destroy(struct client_obd *cli)
658 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659 cli->cl_max_rpcs_in_flight) {
660 /* The destroy request can be sent */
663 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664 cli->cl_max_rpcs_in_flight) {
666 * The counter has been modified between the two atomic
669 cfs_waitq_signal(&cli->cl_destroy_waitq);
674 /* Destroy requests can be async always on the client, and we don't even really
675 * care about the return code since the client cannot do anything at all about
677 * When the MDS is unlinking a filename, it saves the file objects into a
678 * recovery llog, and these object records are cancelled when the OST reports
679 * they were destroyed and sync'd to disk (i.e. transaction committed).
680 * If the client dies, or the OST is down when the object should be destroyed,
681 * the records are not cancelled, and when the OST reconnects to the MDS next,
682 * it will retrieve the llog unlink logs and then sends the log cancellation
683 * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685 struct lov_stripe_md *ea, struct obd_trans_info *oti,
686 struct obd_export *md_export, void *capa)
688 struct client_obd *cli = &exp->exp_obd->u.cli;
689 struct ptlrpc_request *req;
690 struct ost_body *body;
691 CFS_LIST_HEAD(cancels);
696 CDEBUG(D_INFO, "oa NULL\n");
700 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701 LDLM_FL_DISCARD_DATA);
703 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
705 ldlm_lock_list_put(&cancels, l_bl_ast, count);
709 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
713 ptlrpc_request_free(req);
717 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718 ptlrpc_at_set_req_timeout(req);
720 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721 oa->o_lcookie = *oti->oti_logcookies;
722 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
724 lustre_set_wire_obdo(&body->oa, oa);
726 osc_pack_capa(req, body, (struct obd_capa *)capa);
727 ptlrpc_request_set_replen(req);
729 /* don't throttle destroy RPCs for the MDT */
730 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731 req->rq_interpret_reply = osc_destroy_interpret;
732 if (!osc_can_send_destroy(cli)) {
733 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
737 * Wait until the number of on-going destroy RPCs drops
738 * under max_rpc_in_flight
740 l_wait_event_exclusive(cli->cl_destroy_waitq,
741 osc_can_send_destroy(cli), &lwi);
745 /* Do not wait for response */
746 ptlrpcd_add_req(req, PSCOPE_OTHER);
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
753 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
755 LASSERT(!(oa->o_valid & bits));
758 client_obd_list_lock(&cli->cl_loi_list_lock);
759 oa->o_dirty = cli->cl_dirty;
760 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
764 } else if (cfs_atomic_read(&obd_dirty_pages) -
765 cfs_atomic_read(&obd_dirty_transit_pages) >
766 obd_max_dirty_pages + 1){
767 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768 * not covered by a lock thus they may safely race and trip
769 * this CERROR() unless we add in a small fudge factor (+1). */
770 CERROR("dirty %d - %d > system dirty_max %d\n",
771 cfs_atomic_read(&obd_dirty_pages),
772 cfs_atomic_read(&obd_dirty_transit_pages),
773 obd_max_dirty_pages);
775 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776 CERROR("dirty %lu - dirty_max %lu too big???\n",
777 cli->cl_dirty, cli->cl_dirty_max);
780 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781 (cli->cl_max_rpcs_in_flight + 1);
782 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
784 oa->o_grant = cli->cl_avail_grant;
785 oa->o_dropped = cli->cl_lost_grant;
786 cli->cl_lost_grant = 0;
787 client_obd_list_unlock(&cli->cl_loi_list_lock);
788 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
793 static void osc_update_next_shrink(struct client_obd *cli)
795 cli->cl_next_shrink_grant =
796 cfs_time_shift(cli->cl_grant_shrink_interval);
797 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798 cli->cl_next_shrink_grant);
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803 struct brw_page *pga)
805 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807 cfs_atomic_inc(&obd_dirty_pages);
808 cli->cl_dirty += CFS_PAGE_SIZE;
809 cli->cl_avail_grant -= CFS_PAGE_SIZE;
810 pga->flag |= OBD_BRW_FROM_GRANT;
811 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812 CFS_PAGE_SIZE, pga, pga->pg);
813 LASSERT(cli->cl_avail_grant >= 0);
814 osc_update_next_shrink(cli);
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818 * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820 struct brw_page *pga, int sent)
822 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
825 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
831 pga->flag &= ~OBD_BRW_FROM_GRANT;
832 cfs_atomic_dec(&obd_dirty_pages);
833 cli->cl_dirty -= CFS_PAGE_SIZE;
834 if (pga->flag & OBD_BRW_NOCACHE) {
835 pga->flag &= ~OBD_BRW_NOCACHE;
836 cfs_atomic_dec(&obd_dirty_transit_pages);
837 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
840 cli->cl_lost_grant += CFS_PAGE_SIZE;
841 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844 /* For short writes we shouldn't count parts of pages that
845 * span a whole block on the OST side, or our accounting goes
846 * wrong. Should match the code in filter_grant_check. */
847 int offset = pga->off & ~CFS_PAGE_MASK;
848 int count = pga->count + (offset & (blocksize - 1));
849 int end = (offset + pga->count) & (blocksize - 1);
851 count += blocksize - end;
853 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856 cli->cl_avail_grant, cli->cl_dirty);
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
864 return cli->cl_r_in_flight + cli->cl_w_in_flight;
867 int osc_wake_sync_fs(struct client_obd *cli)
871 if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
872 cli->cl_sf_wait.started) {
873 cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, rc);
874 cli->cl_sf_wait.started = 0;
875 CDEBUG(D_CACHE, "sync_fs_loi list is empty\n");
880 /* caller must hold loi_list_lock */
881 void osc_wake_cache_waiters(struct client_obd *cli)
884 struct osc_cache_waiter *ocw;
887 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
888 /* if we can't dirty more, we must wait until some is written */
889 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
890 (cfs_atomic_read(&obd_dirty_pages) + 1 >
891 obd_max_dirty_pages)) {
892 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
893 "osc max %ld, sys max %d\n", cli->cl_dirty,
894 cli->cl_dirty_max, obd_max_dirty_pages);
898 /* if still dirty cache but no grant wait for pending RPCs that
899 * may yet return us some grant before doing sync writes */
900 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
901 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
902 cli->cl_w_in_flight);
906 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
907 cfs_list_del_init(&ocw->ocw_entry);
908 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
909 /* no more RPCs in flight to return grant, do sync IO */
910 ocw->ocw_rc = -EDQUOT;
911 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
913 osc_consume_write_grant(cli,
914 &ocw->ocw_oap->oap_brw_page);
917 cfs_waitq_signal(&ocw->ocw_waitq);
923 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
925 client_obd_list_lock(&cli->cl_loi_list_lock);
926 cli->cl_avail_grant += grant;
927 client_obd_list_unlock(&cli->cl_loi_list_lock);
930 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
932 if (body->oa.o_valid & OBD_MD_FLGRANT) {
933 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
934 __osc_update_grant(cli, body->oa.o_grant);
938 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
939 void *key, obd_count vallen, void *val,
940 struct ptlrpc_request_set *set);
942 static int osc_shrink_grant_interpret(const struct lu_env *env,
943 struct ptlrpc_request *req,
946 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
947 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
948 struct ost_body *body;
951 __osc_update_grant(cli, oa->o_grant);
955 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
957 osc_update_grant(cli, body);
963 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
965 client_obd_list_lock(&cli->cl_loi_list_lock);
966 oa->o_grant = cli->cl_avail_grant / 4;
967 cli->cl_avail_grant -= oa->o_grant;
968 client_obd_list_unlock(&cli->cl_loi_list_lock);
969 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
970 oa->o_valid |= OBD_MD_FLFLAGS;
973 oa->o_flags |= OBD_FL_SHRINK_GRANT;
974 osc_update_next_shrink(cli);
977 /* Shrink the current grant, either from some large amount to enough for a
978 * full set of in-flight RPCs, or if we have already shrunk to that limit
979 * then to enough for a single RPC. This avoids keeping more grant than
980 * needed, and avoids shrinking the grant piecemeal. */
981 static int osc_shrink_grant(struct client_obd *cli)
983 long target = (cli->cl_max_rpcs_in_flight + 1) *
984 cli->cl_max_pages_per_rpc;
986 client_obd_list_lock(&cli->cl_loi_list_lock);
987 if (cli->cl_avail_grant <= target)
988 target = cli->cl_max_pages_per_rpc;
989 client_obd_list_unlock(&cli->cl_loi_list_lock);
991 return osc_shrink_grant_to_target(cli, target);
994 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
997 struct ost_body *body;
1000 client_obd_list_lock(&cli->cl_loi_list_lock);
1001 /* Don't shrink if we are already above or below the desired limit
1002 * We don't want to shrink below a single RPC, as that will negatively
1003 * impact block allocation and long-term performance. */
1004 if (target < cli->cl_max_pages_per_rpc)
1005 target = cli->cl_max_pages_per_rpc;
1007 if (target >= cli->cl_avail_grant) {
1008 client_obd_list_unlock(&cli->cl_loi_list_lock);
1011 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013 OBD_ALLOC_PTR(body);
1017 osc_announce_cached(cli, &body->oa, 0);
1019 client_obd_list_lock(&cli->cl_loi_list_lock);
1020 body->oa.o_grant = cli->cl_avail_grant - target;
1021 cli->cl_avail_grant = target;
1022 client_obd_list_unlock(&cli->cl_loi_list_lock);
1023 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1024 body->oa.o_valid |= OBD_MD_FLFLAGS;
1025 body->oa.o_flags = 0;
1027 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1028 osc_update_next_shrink(cli);
1030 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1031 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1032 sizeof(*body), body, NULL);
1034 __osc_update_grant(cli, body->oa.o_grant);
1039 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1040 static int osc_should_shrink_grant(struct client_obd *client)
1042 cfs_time_t time = cfs_time_current();
1043 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1045 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1046 OBD_CONNECT_GRANT_SHRINK) == 0)
1049 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1050 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1051 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1054 osc_update_next_shrink(client);
1059 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1061 struct client_obd *client;
1063 cfs_list_for_each_entry(client, &item->ti_obd_list,
1064 cl_grant_shrink_list) {
1065 if (osc_should_shrink_grant(client))
1066 osc_shrink_grant(client);
1071 static int osc_add_shrink_grant(struct client_obd *client)
1075 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1077 osc_grant_shrink_grant_cb, NULL,
1078 &client->cl_grant_shrink_list);
1080 CERROR("add grant client %s error %d\n",
1081 client->cl_import->imp_obd->obd_name, rc);
1084 CDEBUG(D_CACHE, "add grant client %s \n",
1085 client->cl_import->imp_obd->obd_name);
1086 osc_update_next_shrink(client);
1090 static int osc_del_shrink_grant(struct client_obd *client)
1092 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1096 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1099 * ocd_grant is the total grant amount we're expect to hold: if we've
1100 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1101 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1103 * race is tolerable here: if we're evicted, but imp_state already
1104 * left EVICTED state, then cl_dirty must be 0 already.
1106 client_obd_list_lock(&cli->cl_loi_list_lock);
1107 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1108 cli->cl_avail_grant = ocd->ocd_grant;
1110 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1112 if (cli->cl_avail_grant < 0) {
1113 CWARN("%s: available grant < 0, the OSS is probably not running"
1114 " with patch from bug20278 (%ld) \n",
1115 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1116 /* workaround for 1.6 servers which do not have
1117 * the patch from bug20278 */
1118 cli->cl_avail_grant = ocd->ocd_grant;
1121 client_obd_list_unlock(&cli->cl_loi_list_lock);
1123 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1124 cli->cl_import->imp_obd->obd_name,
1125 cli->cl_avail_grant, cli->cl_lost_grant);
1127 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1128 cfs_list_empty(&cli->cl_grant_shrink_list))
1129 osc_add_shrink_grant(cli);
1132 /* We assume that the reason this OSC got a short read is because it read
1133 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1134 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1135 * this stripe never got written at or beyond this stripe offset yet. */
1136 static void handle_short_read(int nob_read, obd_count page_count,
1137 struct brw_page **pga)
1142 /* skip bytes read OK */
1143 while (nob_read > 0) {
1144 LASSERT (page_count > 0);
1146 if (pga[i]->count > nob_read) {
1147 /* EOF inside this page */
1148 ptr = cfs_kmap(pga[i]->pg) +
1149 (pga[i]->off & ~CFS_PAGE_MASK);
1150 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1151 cfs_kunmap(pga[i]->pg);
1157 nob_read -= pga[i]->count;
1162 /* zero remaining pages */
1163 while (page_count-- > 0) {
1164 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1165 memset(ptr, 0, pga[i]->count);
1166 cfs_kunmap(pga[i]->pg);
1171 static int check_write_rcs(struct ptlrpc_request *req,
1172 int requested_nob, int niocount,
1173 obd_count page_count, struct brw_page **pga)
1178 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1179 sizeof(*remote_rcs) *
1181 if (remote_rcs == NULL) {
1182 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1186 /* return error if any niobuf was in error */
1187 for (i = 0; i < niocount; i++) {
1188 if (remote_rcs[i] < 0)
1189 return(remote_rcs[i]);
1191 if (remote_rcs[i] != 0) {
1192 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1193 i, remote_rcs[i], req);
1198 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1199 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1200 req->rq_bulk->bd_nob_transferred, requested_nob);
1207 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1209 if (p1->flag != p2->flag) {
1210 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1211 OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1213 /* warn if we try to combine flags that we don't know to be
1214 * safe to combine */
1215 if ((p1->flag & mask) != (p2->flag & mask))
1216 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1217 "same brw?\n", p1->flag, p2->flag);
1221 return (p1->off + p1->count == p2->off);
1224 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1225 struct brw_page **pga, int opc,
1226 cksum_type_t cksum_type)
1231 LASSERT (pg_count > 0);
1232 cksum = init_checksum(cksum_type);
1233 while (nob > 0 && pg_count > 0) {
1234 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1235 int off = pga[i]->off & ~CFS_PAGE_MASK;
1236 int count = pga[i]->count > nob ? nob : pga[i]->count;
1238 /* corrupt the data before we compute the checksum, to
1239 * simulate an OST->client data error */
1240 if (i == 0 && opc == OST_READ &&
1241 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1242 memcpy(ptr + off, "bad1", min(4, nob));
1243 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1244 cfs_kunmap(pga[i]->pg);
1245 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1248 nob -= pga[i]->count;
1252 /* For sending we only compute the wrong checksum instead
1253 * of corrupting the data so it is still correct on a redo */
1254 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1260 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1261 struct lov_stripe_md *lsm, obd_count page_count,
1262 struct brw_page **pga,
1263 struct ptlrpc_request **reqp,
1264 struct obd_capa *ocapa, int reserve)
1266 struct ptlrpc_request *req;
1267 struct ptlrpc_bulk_desc *desc;
1268 struct ost_body *body;
1269 struct obd_ioobj *ioobj;
1270 struct niobuf_remote *niobuf;
1271 int niocount, i, requested_nob, opc, rc;
1272 struct osc_brw_async_args *aa;
1273 struct req_capsule *pill;
1274 struct brw_page *pg_prev;
1277 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1278 RETURN(-ENOMEM); /* Recoverable */
1279 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1280 RETURN(-EINVAL); /* Fatal */
1282 if ((cmd & OBD_BRW_WRITE) != 0) {
1284 req = ptlrpc_request_alloc_pool(cli->cl_import,
1285 cli->cl_import->imp_rq_pool,
1286 &RQF_OST_BRW_WRITE);
1289 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1294 for (niocount = i = 1; i < page_count; i++) {
1295 if (!can_merge_pages(pga[i - 1], pga[i]))
1299 pill = &req->rq_pill;
1300 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1302 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1303 niocount * sizeof(*niobuf));
1304 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1306 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1308 ptlrpc_request_free(req);
1311 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1312 ptlrpc_at_set_req_timeout(req);
1314 if (opc == OST_WRITE)
1315 desc = ptlrpc_prep_bulk_imp(req, page_count,
1316 BULK_GET_SOURCE, OST_BULK_PORTAL);
1318 desc = ptlrpc_prep_bulk_imp(req, page_count,
1319 BULK_PUT_SINK, OST_BULK_PORTAL);
1322 GOTO(out, rc = -ENOMEM);
1323 /* NB request now owns desc and will free it when it gets freed */
1325 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1326 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1327 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1328 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1330 lustre_set_wire_obdo(&body->oa, oa);
1332 obdo_to_ioobj(oa, ioobj);
1333 ioobj->ioo_bufcnt = niocount;
1334 osc_pack_capa(req, body, ocapa);
1335 LASSERT (page_count > 0);
1337 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1338 struct brw_page *pg = pga[i];
1340 LASSERT(pg->count > 0);
1341 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1342 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1343 pg->off, pg->count);
1345 LASSERTF(i == 0 || pg->off > pg_prev->off,
1346 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1347 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1349 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1350 pg_prev->pg, page_private(pg_prev->pg),
1351 pg_prev->pg->index, pg_prev->off);
1353 LASSERTF(i == 0 || pg->off > pg_prev->off,
1354 "i %d p_c %u\n", i, page_count);
1356 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1357 (pg->flag & OBD_BRW_SRVLOCK));
1359 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1361 requested_nob += pg->count;
1363 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1365 niobuf->len += pg->count;
1367 niobuf->offset = pg->off;
1368 niobuf->len = pg->count;
1369 niobuf->flags = pg->flag;
1374 LASSERTF((void *)(niobuf - niocount) ==
1375 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1376 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1377 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1379 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1380 if (osc_should_shrink_grant(cli))
1381 osc_shrink_grant_local(cli, &body->oa);
1383 /* size[REQ_REC_OFF] still sizeof (*body) */
1384 if (opc == OST_WRITE) {
1385 if (unlikely(cli->cl_checksum) &&
1386 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387 /* store cl_cksum_type in a local variable since
1388 * it can be changed via lprocfs */
1389 cksum_type_t cksum_type = cli->cl_cksum_type;
1391 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1392 oa->o_flags &= OBD_FL_LOCAL_MASK;
1393 body->oa.o_flags = 0;
1395 body->oa.o_flags |= cksum_type_pack(cksum_type);
1396 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1397 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1401 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1403 /* save this in 'oa', too, for later checking */
1404 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1405 oa->o_flags |= cksum_type_pack(cksum_type);
1407 /* clear out the checksum flag, in case this is a
1408 * resend but cl_checksum is no longer set. b=11238 */
1409 oa->o_valid &= ~OBD_MD_FLCKSUM;
1411 oa->o_cksum = body->oa.o_cksum;
1412 /* 1 RC per niobuf */
1413 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1414 sizeof(__u32) * niocount);
1416 if (unlikely(cli->cl_checksum) &&
1417 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1418 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1419 body->oa.o_flags = 0;
1420 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1421 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424 ptlrpc_request_set_replen(req);
1426 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1427 aa = ptlrpc_req_async_args(req);
1429 aa->aa_requested_nob = requested_nob;
1430 aa->aa_nio_count = niocount;
1431 aa->aa_page_count = page_count;
1435 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1436 if (ocapa && reserve)
1437 aa->aa_ocapa = capa_get(ocapa);
1443 ptlrpc_req_finished(req);
1447 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1448 __u32 client_cksum, __u32 server_cksum, int nob,
1449 obd_count page_count, struct brw_page **pga,
1450 cksum_type_t client_cksum_type)
1454 cksum_type_t cksum_type;
1456 if (server_cksum == client_cksum) {
1457 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1461 /* If this is mmaped file - it can be changed at any time */
1462 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1465 if (oa->o_valid & OBD_MD_FLFLAGS)
1466 cksum_type = cksum_type_unpack(oa->o_flags);
1468 cksum_type = OBD_CKSUM_CRC32;
1470 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1473 if (cksum_type != client_cksum_type)
1474 msg = "the server did not use the checksum type specified in "
1475 "the original request - likely a protocol problem";
1476 else if (new_cksum == server_cksum)
1477 msg = "changed on the client after we checksummed it - "
1478 "likely false positive due to mmap IO (bug 11742)";
1479 else if (new_cksum == client_cksum)
1480 msg = "changed in transit before arrival at OST";
1482 msg = "changed in transit AND doesn't match the original - "
1483 "likely false positive due to mmap IO (bug 11742)";
1485 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1486 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1487 msg, libcfs_nid2str(peer->nid),
1488 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1489 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1490 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1492 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1494 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1495 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1496 "client csum now %x\n", client_cksum, client_cksum_type,
1497 server_cksum, cksum_type, new_cksum);
1501 /* Note rc enters this function as number of bytes transferred */
1502 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1504 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1505 const lnet_process_id_t *peer =
1506 &req->rq_import->imp_connection->c_peer;
1507 struct client_obd *cli = aa->aa_cli;
1508 struct ost_body *body;
1509 __u32 client_cksum = 0;
1512 if (rc < 0 && rc != -EDQUOT) {
1513 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1517 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1518 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1520 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1524 #ifdef HAVE_QUOTA_SUPPORT
1525 /* set/clear over quota flag for a uid/gid */
1526 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1527 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1528 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1530 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1531 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1533 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1538 osc_update_grant(cli, body);
1543 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1544 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1546 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1548 CERROR("Unexpected +ve rc %d\n", rc);
1551 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1553 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1556 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1557 check_write_checksum(&body->oa, peer, client_cksum,
1558 body->oa.o_cksum, aa->aa_requested_nob,
1559 aa->aa_page_count, aa->aa_ppga,
1560 cksum_type_unpack(aa->aa_oa->o_flags)))
1563 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1564 aa->aa_page_count, aa->aa_ppga);
1568 /* The rest of this function executes only for OST_READs */
1570 /* if unwrap_bulk failed, return -EAGAIN to retry */
1571 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1573 GOTO(out, rc = -EAGAIN);
1575 if (rc > aa->aa_requested_nob) {
1576 CERROR("Unexpected rc %d (%d requested)\n", rc,
1577 aa->aa_requested_nob);
1581 if (rc != req->rq_bulk->bd_nob_transferred) {
1582 CERROR ("Unexpected rc %d (%d transferred)\n",
1583 rc, req->rq_bulk->bd_nob_transferred);
1587 if (rc < aa->aa_requested_nob)
1588 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1590 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1591 static int cksum_counter;
1592 __u32 server_cksum = body->oa.o_cksum;
1595 cksum_type_t cksum_type;
1597 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1598 cksum_type = cksum_type_unpack(body->oa.o_flags);
1600 cksum_type = OBD_CKSUM_CRC32;
1601 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1602 aa->aa_ppga, OST_READ,
1605 if (peer->nid == req->rq_bulk->bd_sender) {
1609 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1612 if (server_cksum == ~0 && rc > 0) {
1613 CERROR("Protocol error: server %s set the 'checksum' "
1614 "bit, but didn't send a checksum. Not fatal, "
1615 "but please notify on http://bugzilla.lustre.org/\n",
1616 libcfs_nid2str(peer->nid));
1617 } else if (server_cksum != client_cksum) {
1618 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1619 "%s%s%s inode "DFID" object "
1620 LPU64"/"LPU64" extent "
1621 "["LPU64"-"LPU64"]\n",
1622 req->rq_import->imp_obd->obd_name,
1623 libcfs_nid2str(peer->nid),
1625 body->oa.o_valid & OBD_MD_FLFID ?
1626 body->oa.o_parent_seq : (__u64)0,
1627 body->oa.o_valid & OBD_MD_FLFID ?
1628 body->oa.o_parent_oid : 0,
1629 body->oa.o_valid & OBD_MD_FLFID ?
1630 body->oa.o_parent_ver : 0,
1632 body->oa.o_valid & OBD_MD_FLGROUP ?
1633 body->oa.o_seq : (__u64)0,
1634 aa->aa_ppga[0]->off,
1635 aa->aa_ppga[aa->aa_page_count-1]->off +
1636 aa->aa_ppga[aa->aa_page_count-1]->count -
1638 CERROR("client %x, server %x, cksum_type %x\n",
1639 client_cksum, server_cksum, cksum_type);
1641 aa->aa_oa->o_cksum = client_cksum;
1645 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1648 } else if (unlikely(client_cksum)) {
1649 static int cksum_missed;
1652 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1653 CERROR("Checksum %u requested from %s but not sent\n",
1654 cksum_missed, libcfs_nid2str(peer->nid));
1660 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1665 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1666 struct lov_stripe_md *lsm,
1667 obd_count page_count, struct brw_page **pga,
1668 struct obd_capa *ocapa)
1670 struct ptlrpc_request *req;
1674 struct l_wait_info lwi;
1678 cfs_waitq_init(&waitq);
1681 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1682 page_count, pga, &req, ocapa, 0);
1686 rc = ptlrpc_queue_wait(req);
1688 if (rc == -ETIMEDOUT && req->rq_resend) {
1689 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1690 ptlrpc_req_finished(req);
1694 rc = osc_brw_fini_request(req, rc);
1696 ptlrpc_req_finished(req);
1697 if (osc_recoverable_error(rc)) {
1699 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1700 CERROR("too many resend retries, returning error\n");
1704 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1705 l_wait_event(waitq, 0, &lwi);
1713 int osc_brw_redo_request(struct ptlrpc_request *request,
1714 struct osc_brw_async_args *aa)
1716 struct ptlrpc_request *new_req;
1717 struct ptlrpc_request_set *set = request->rq_set;
1718 struct osc_brw_async_args *new_aa;
1719 struct osc_async_page *oap;
1723 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1724 CERROR("too many resent retries, returning error\n");
1728 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1730 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1731 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1732 aa->aa_cli, aa->aa_oa,
1733 NULL /* lsm unused by osc currently */,
1734 aa->aa_page_count, aa->aa_ppga,
1735 &new_req, aa->aa_ocapa, 0);
1739 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1741 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1742 if (oap->oap_request != NULL) {
1743 LASSERTF(request == oap->oap_request,
1744 "request %p != oap_request %p\n",
1745 request, oap->oap_request);
1746 if (oap->oap_interrupted) {
1747 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1748 ptlrpc_req_finished(new_req);
1753 /* New request takes over pga and oaps from old request.
1754 * Note that copying a list_head doesn't work, need to move it... */
1756 new_req->rq_interpret_reply = request->rq_interpret_reply;
1757 new_req->rq_async_args = request->rq_async_args;
1758 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1760 new_aa = ptlrpc_req_async_args(new_req);
1762 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1763 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1764 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1766 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1767 if (oap->oap_request) {
1768 ptlrpc_req_finished(oap->oap_request);
1769 oap->oap_request = ptlrpc_request_addref(new_req);
1773 new_aa->aa_ocapa = aa->aa_ocapa;
1774 aa->aa_ocapa = NULL;
1776 /* use ptlrpc_set_add_req is safe because interpret functions work
1777 * in check_set context. only one way exist with access to request
1778 * from different thread got -EINTR - this way protected with
1779 * cl_loi_list_lock */
1780 ptlrpc_set_add_req(set, new_req);
1782 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1784 DEBUG_REQ(D_INFO, new_req, "new request");
1789 * ugh, we want disk allocation on the target to happen in offset order. we'll
1790 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1791 * fine for our small page arrays and doesn't require allocation. its an
1792 * insertion sort that swaps elements that are strides apart, shrinking the
1793 * stride down until its '1' and the array is sorted.
1795 static void sort_brw_pages(struct brw_page **array, int num)
1798 struct brw_page *tmp;
1802 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1807 for (i = stride ; i < num ; i++) {
1810 while (j >= stride && array[j - stride]->off > tmp->off) {
1811 array[j] = array[j - stride];
1816 } while (stride > 1);
1819 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1825 LASSERT (pages > 0);
1826 offset = pg[i]->off & ~CFS_PAGE_MASK;
1830 if (pages == 0) /* that's all */
1833 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1834 return count; /* doesn't end on page boundary */
1837 offset = pg[i]->off & ~CFS_PAGE_MASK;
1838 if (offset != 0) /* doesn't start on page boundary */
1845 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1847 struct brw_page **ppga;
1850 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1854 for (i = 0; i < count; i++)
1859 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1861 LASSERT(ppga != NULL);
1862 OBD_FREE(ppga, sizeof(*ppga) * count);
1865 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1866 obd_count page_count, struct brw_page *pga,
1867 struct obd_trans_info *oti)
1869 struct obdo *saved_oa = NULL;
1870 struct brw_page **ppga, **orig;
1871 struct obd_import *imp = class_exp2cliimp(exp);
1872 struct client_obd *cli;
1873 int rc, page_count_orig;
1876 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1877 cli = &imp->imp_obd->u.cli;
1879 if (cmd & OBD_BRW_CHECK) {
1880 /* The caller just wants to know if there's a chance that this
1881 * I/O can succeed */
1883 if (imp->imp_invalid)
1888 /* test_brw with a failed create can trip this, maybe others. */
1889 LASSERT(cli->cl_max_pages_per_rpc);
1893 orig = ppga = osc_build_ppga(pga, page_count);
1896 page_count_orig = page_count;
1898 sort_brw_pages(ppga, page_count);
1899 while (page_count) {
1900 obd_count pages_per_brw;
1902 if (page_count > cli->cl_max_pages_per_rpc)
1903 pages_per_brw = cli->cl_max_pages_per_rpc;
1905 pages_per_brw = page_count;
1907 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1909 if (saved_oa != NULL) {
1910 /* restore previously saved oa */
1911 *oinfo->oi_oa = *saved_oa;
1912 } else if (page_count > pages_per_brw) {
1913 /* save a copy of oa (brw will clobber it) */
1914 OBDO_ALLOC(saved_oa);
1915 if (saved_oa == NULL)
1916 GOTO(out, rc = -ENOMEM);
1917 *saved_oa = *oinfo->oi_oa;
1920 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1921 pages_per_brw, ppga, oinfo->oi_capa);
1926 page_count -= pages_per_brw;
1927 ppga += pages_per_brw;
1931 osc_release_ppga(orig, page_count_orig);
1933 if (saved_oa != NULL)
1934 OBDO_FREE(saved_oa);
1939 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1940 * the dirty accounting. Writeback completes or truncate happens before
1941 * writing starts. Must be called with the loi lock held. */
1942 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1945 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1948 static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
1950 struct osc_async_page *oap;
1953 if (cfs_list_empty(&lop->lop_urgent))
1956 oap = cfs_list_entry(lop->lop_urgent.next,
1957 struct osc_async_page, oap_urgent_item);
1959 if (oap->oap_async_flags & ASYNC_SYNCFS) {
1960 CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
1967 /* This maintains the lists of pending pages to read/write for a given object
1968 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1969 * to quickly find objects that are ready to send an RPC. */
1970 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1976 if (lop->lop_num_pending == 0)
1979 /* if we have an invalid import we want to drain the queued pages
1980 * by forcing them through rpcs that immediately fail and complete
1981 * the pages. recovery relies on this to empty the queued pages
1982 * before canceling the locks and evicting down the llite pages */
1983 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1986 /* stream rpcs in queue order as long as as there is an urgent page
1987 * queued. this is our cheap solution for good batching in the case
1988 * where writepage marks some random page in the middle of the file
1989 * as urgent because of, say, memory pressure */
1990 if (!cfs_list_empty(&lop->lop_urgent)) {
1991 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1994 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1995 optimal = cli->cl_max_pages_per_rpc;
1996 if (cmd & OBD_BRW_WRITE) {
1997 /* trigger a write rpc stream as long as there are dirtiers
1998 * waiting for space. as they're waiting, they're not going to
1999 * create more pages to coalesce with what's waiting.. */
2000 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2001 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2004 /* +16 to avoid triggering rpcs that would want to include pages
2005 * that are being queued but which can't be made ready until
2006 * the queuer finishes with the page. this is a wart for
2007 * llite::commit_write() */
2010 if (lop->lop_num_pending >= optimal)
2016 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2018 struct osc_async_page *oap;
2021 if (cfs_list_empty(&lop->lop_urgent))
2024 oap = cfs_list_entry(lop->lop_urgent.next,
2025 struct osc_async_page, oap_urgent_item);
2027 if (oap->oap_async_flags & ASYNC_HP) {
2028 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2035 static void on_list(cfs_list_t *item, cfs_list_t *list,
2038 if (cfs_list_empty(item) && should_be_on)
2039 cfs_list_add_tail(item, list);
2040 else if (!cfs_list_empty(item) && !should_be_on)
2041 cfs_list_del_init(item);
2044 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2045 * can find pages to build into rpcs quickly */
2046 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2048 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2049 lop_makes_hprpc(&loi->loi_read_lop)) {
2051 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2052 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2054 if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
2055 on_list(&loi->loi_sync_fs_item,
2056 &cli->cl_loi_sync_fs_list,
2057 loi->loi_write_lop.lop_num_pending);
2059 on_list(&loi->loi_hp_ready_item,
2060 &cli->cl_loi_hp_ready_list, 0);
2061 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2062 lop_makes_rpc(cli, &loi->loi_write_lop,
2064 lop_makes_rpc(cli, &loi->loi_read_lop,
2069 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2070 loi->loi_write_lop.lop_num_pending);
2072 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2073 loi->loi_read_lop.lop_num_pending);
2076 static void lop_update_pending(struct client_obd *cli,
2077 struct loi_oap_pages *lop, int cmd, int delta)
2079 lop->lop_num_pending += delta;
2080 if (cmd & OBD_BRW_WRITE)
2081 cli->cl_pending_w_pages += delta;
2083 cli->cl_pending_r_pages += delta;
2087 * this is called when a sync waiter receives an interruption. Its job is to
2088 * get the caller woken as soon as possible. If its page hasn't been put in an
2089 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2090 * desiring interruption which will forcefully complete the rpc once the rpc
2093 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2095 struct loi_oap_pages *lop;
2096 struct lov_oinfo *loi;
2100 LASSERT(!oap->oap_interrupted);
2101 oap->oap_interrupted = 1;
2103 /* ok, it's been put in an rpc. only one oap gets a request reference */
2104 if (oap->oap_request != NULL) {
2105 ptlrpc_mark_interrupted(oap->oap_request);
2106 ptlrpcd_wake(oap->oap_request);
2107 ptlrpc_req_finished(oap->oap_request);
2108 oap->oap_request = NULL;
2112 * page completion may be called only if ->cpo_prep() method was
2113 * executed by osc_io_submit(), that also adds page the to pending list
2115 if (!cfs_list_empty(&oap->oap_pending_item)) {
2116 cfs_list_del_init(&oap->oap_pending_item);
2117 cfs_list_del_init(&oap->oap_urgent_item);
2120 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2121 &loi->loi_write_lop : &loi->loi_read_lop;
2122 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2123 loi_list_maint(oap->oap_cli, oap->oap_loi);
2124 rc = oap->oap_caller_ops->ap_completion(env,
2125 oap->oap_caller_data,
2126 oap->oap_cmd, NULL, -EINTR);
2132 /* this is trying to propogate async writeback errors back up to the
2133 * application. As an async write fails we record the error code for later if
2134 * the app does an fsync. As long as errors persist we force future rpcs to be
2135 * sync so that the app can get a sync error and break the cycle of queueing
2136 * pages for which writeback will fail. */
2137 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2144 ar->ar_force_sync = 1;
2145 ar->ar_min_xid = ptlrpc_sample_next_xid();
2150 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2151 ar->ar_force_sync = 0;
2154 void osc_oap_to_pending(struct osc_async_page *oap)
2156 struct loi_oap_pages *lop;
2158 if (oap->oap_cmd & OBD_BRW_WRITE)
2159 lop = &oap->oap_loi->loi_write_lop;
2161 lop = &oap->oap_loi->loi_read_lop;
2163 if (oap->oap_async_flags & ASYNC_HP)
2164 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2165 else if (oap->oap_async_flags & ASYNC_URGENT)
2166 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2167 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2168 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2171 /* this must be called holding the loi list lock to give coverage to exit_cache,
2172 * async_flag maintenance, and oap_request */
2173 static void osc_ap_completion(const struct lu_env *env,
2174 struct client_obd *cli, struct obdo *oa,
2175 struct osc_async_page *oap, int sent, int rc)
2180 if (oap->oap_request != NULL) {
2181 xid = ptlrpc_req_xid(oap->oap_request);
2182 ptlrpc_req_finished(oap->oap_request);
2183 oap->oap_request = NULL;
2186 cfs_spin_lock(&oap->oap_lock);
2187 oap->oap_async_flags = 0;
2188 cfs_spin_unlock(&oap->oap_lock);
2189 oap->oap_interrupted = 0;
2191 if (oap->oap_cmd & OBD_BRW_WRITE) {
2192 osc_process_ar(&cli->cl_ar, xid, rc);
2193 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2196 if (rc == 0 && oa != NULL) {
2197 if (oa->o_valid & OBD_MD_FLBLOCKS)
2198 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2199 if (oa->o_valid & OBD_MD_FLMTIME)
2200 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2201 if (oa->o_valid & OBD_MD_FLATIME)
2202 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2203 if (oa->o_valid & OBD_MD_FLCTIME)
2204 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2207 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2208 oap->oap_cmd, oa, rc);
2210 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2211 * I/O on the page could start, but OSC calls it under lock
2212 * and thus we can add oap back to pending safely */
2214 /* upper layer wants to leave the page on pending queue */
2215 osc_oap_to_pending(oap);
2217 osc_exit_cache(cli, oap, sent);
2221 static int brw_interpret(const struct lu_env *env,
2222 struct ptlrpc_request *req, void *data, int rc)
2224 struct osc_brw_async_args *aa = data;
2225 struct client_obd *cli;
2229 rc = osc_brw_fini_request(req, rc);
2230 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2231 if (osc_recoverable_error(rc)) {
2232 /* Only retry once for mmaped files since the mmaped page
2233 * might be modified at anytime. We have to retry at least
2234 * once in case there WAS really a corruption of the page
2235 * on the network, that was not caused by mmap() modifying
2236 * the page. Bug11742 */
2237 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2238 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2239 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2242 rc = osc_brw_redo_request(req, aa);
2249 capa_put(aa->aa_ocapa);
2250 aa->aa_ocapa = NULL;
2255 client_obd_list_lock(&cli->cl_loi_list_lock);
2257 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2258 * is called so we know whether to go to sync BRWs or wait for more
2259 * RPCs to complete */
2260 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2261 cli->cl_w_in_flight--;
2263 cli->cl_r_in_flight--;
2265 async = cfs_list_empty(&aa->aa_oaps);
2266 if (!async) { /* from osc_send_oap_rpc() */
2267 struct osc_async_page *oap, *tmp;
2268 /* the caller may re-use the oap after the completion call so
2269 * we need to clean it up a little */
2270 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2272 cfs_list_del_init(&oap->oap_rpc_item);
2273 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2275 OBDO_FREE(aa->aa_oa);
2276 } else { /* from async_internal() */
2278 for (i = 0; i < aa->aa_page_count; i++)
2279 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2281 osc_wake_cache_waiters(cli);
2282 osc_wake_sync_fs(cli);
2283 osc_check_rpcs(env, cli);
2284 client_obd_list_unlock(&cli->cl_loi_list_lock);
2286 cl_req_completion(env, aa->aa_clerq, rc);
2287 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2292 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2293 struct client_obd *cli,
2294 cfs_list_t *rpc_list,
2295 int page_count, int cmd)
2297 struct ptlrpc_request *req;
2298 struct brw_page **pga = NULL;
2299 struct osc_brw_async_args *aa;
2300 struct obdo *oa = NULL;
2301 const struct obd_async_page_ops *ops = NULL;
2302 void *caller_data = NULL;
2303 struct osc_async_page *oap;
2304 struct osc_async_page *tmp;
2305 struct ost_body *body;
2306 struct cl_req *clerq = NULL;
2307 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2308 struct ldlm_lock *lock = NULL;
2309 struct cl_req_attr crattr;
2310 int i, rc, mpflag = 0;
2313 LASSERT(!cfs_list_empty(rpc_list));
2315 if (cmd & OBD_BRW_MEMALLOC)
2316 mpflag = cfs_memory_pressure_get_and_set();
2318 memset(&crattr, 0, sizeof crattr);
2319 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2321 GOTO(out, req = ERR_PTR(-ENOMEM));
2325 GOTO(out, req = ERR_PTR(-ENOMEM));
2328 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2329 struct cl_page *page = osc_oap2cl_page(oap);
2331 ops = oap->oap_caller_ops;
2332 caller_data = oap->oap_caller_data;
2334 clerq = cl_req_alloc(env, page, crt,
2335 1 /* only 1-object rpcs for
2338 GOTO(out, req = (void *)clerq);
2339 lock = oap->oap_ldlm_lock;
2341 pga[i] = &oap->oap_brw_page;
2342 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2343 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2344 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2346 cl_req_page_add(env, clerq, page);
2349 /* always get the data for the obdo for the rpc */
2350 LASSERT(ops != NULL);
2352 crattr.cra_capa = NULL;
2353 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2355 oa->o_handle = lock->l_remote_handle;
2356 oa->o_valid |= OBD_MD_FLHANDLE;
2359 rc = cl_req_prep(env, clerq);
2361 CERROR("cl_req_prep failed: %d\n", rc);
2362 GOTO(out, req = ERR_PTR(rc));
2365 sort_brw_pages(pga, page_count);
2366 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2367 pga, &req, crattr.cra_capa, 1);
2369 CERROR("prep_req failed: %d\n", rc);
2370 GOTO(out, req = ERR_PTR(rc));
2373 if (cmd & OBD_BRW_MEMALLOC)
2374 req->rq_memalloc = 1;
2376 /* Need to update the timestamps after the request is built in case
2377 * we race with setattr (locally or in queue at OST). If OST gets
2378 * later setattr before earlier BRW (as determined by the request xid),
2379 * the OST will not use BRW timestamps. Sadly, there is no obvious
2380 * way to do this in a single call. bug 10150 */
2381 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2382 cl_req_attr_set(env, clerq, &crattr,
2383 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2385 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2386 aa = ptlrpc_req_async_args(req);
2387 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2388 cfs_list_splice(rpc_list, &aa->aa_oaps);
2389 CFS_INIT_LIST_HEAD(rpc_list);
2390 aa->aa_clerq = clerq;
2392 if (cmd & OBD_BRW_MEMALLOC)
2393 cfs_memory_pressure_restore(mpflag);
2395 capa_put(crattr.cra_capa);
2400 OBD_FREE(pga, sizeof(*pga) * page_count);
2401 /* this should happen rarely and is pretty bad, it makes the
2402 * pending list not follow the dirty order */
2403 client_obd_list_lock(&cli->cl_loi_list_lock);
2404 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2405 cfs_list_del_init(&oap->oap_rpc_item);
2407 /* queued sync pages can be torn down while the pages
2408 * were between the pending list and the rpc */
2409 if (oap->oap_interrupted) {
2410 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2411 osc_ap_completion(env, cli, NULL, oap, 0,
2415 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2417 if (clerq && !IS_ERR(clerq))
2418 cl_req_completion(env, clerq, PTR_ERR(req));
2424 * prepare pages for ASYNC io and put pages in send queue.
2426 * \param cmd OBD_BRW_* macroses
2427 * \param lop pending pages
2429 * \return zero if no page added to send queue.
2430 * \return 1 if pages successfully added to send queue.
2431 * \return negative on errors.
2434 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2435 struct lov_oinfo *loi,
2436 int cmd, struct loi_oap_pages *lop)
2438 struct ptlrpc_request *req;
2439 obd_count page_count = 0;
2440 struct osc_async_page *oap = NULL, *tmp;
2441 struct osc_brw_async_args *aa;
2442 const struct obd_async_page_ops *ops;
2443 CFS_LIST_HEAD(rpc_list);
2444 CFS_LIST_HEAD(tmp_list);
2445 unsigned int ending_offset;
2446 unsigned starting_offset = 0;
2447 int srvlock = 0, mem_tight = 0;
2448 struct cl_object *clob = NULL;
2451 /* ASYNC_HP pages first. At present, when the lock the pages is
2452 * to be canceled, the pages covered by the lock will be sent out
2453 * with ASYNC_HP. We have to send out them as soon as possible. */
2454 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2455 if (oap->oap_async_flags & ASYNC_HP)
2456 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2458 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2459 if (++page_count >= cli->cl_max_pages_per_rpc)
2463 cfs_list_splice(&tmp_list, &lop->lop_pending);
2466 /* first we find the pages we're allowed to work with */
2467 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2469 ops = oap->oap_caller_ops;
2471 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2472 "magic 0x%x\n", oap, oap->oap_magic);
2475 /* pin object in memory, so that completion call-backs
2476 * can be safely called under client_obd_list lock. */
2477 clob = osc_oap2cl_page(oap)->cp_obj;
2478 cl_object_get(clob);
2481 if (page_count != 0 &&
2482 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2483 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2484 " oap %p, page %p, srvlock %u\n",
2485 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2489 /* If there is a gap at the start of this page, it can't merge
2490 * with any previous page, so we'll hand the network a
2491 * "fragmented" page array that it can't transfer in 1 RDMA */
2492 if (page_count != 0 && oap->oap_page_off != 0)
2495 /* in llite being 'ready' equates to the page being locked
2496 * until completion unlocks it. commit_write submits a page
2497 * as not ready because its unlock will happen unconditionally
2498 * as the call returns. if we race with commit_write giving
2499 * us that page we don't want to create a hole in the page
2500 * stream, so we stop and leave the rpc to be fired by
2501 * another dirtier or kupdated interval (the not ready page
2502 * will still be on the dirty list). we could call in
2503 * at the end of ll_file_write to process the queue again. */
2504 if (!(oap->oap_async_flags & ASYNC_READY)) {
2505 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2508 CDEBUG(D_INODE, "oap %p page %p returned %d "
2509 "instead of ready\n", oap,
2513 /* llite is telling us that the page is still
2514 * in commit_write and that we should try
2515 * and put it in an rpc again later. we
2516 * break out of the loop so we don't create
2517 * a hole in the sequence of pages in the rpc
2522 /* the io isn't needed.. tell the checks
2523 * below to complete the rpc with EINTR */
2524 cfs_spin_lock(&oap->oap_lock);
2525 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2526 cfs_spin_unlock(&oap->oap_lock);
2527 oap->oap_count = -EINTR;
2530 cfs_spin_lock(&oap->oap_lock);
2531 oap->oap_async_flags |= ASYNC_READY;
2532 cfs_spin_unlock(&oap->oap_lock);
2535 LASSERTF(0, "oap %p page %p returned %d "
2536 "from make_ready\n", oap,
2544 * Page submitted for IO has to be locked. Either by
2545 * ->ap_make_ready() or by higher layers.
2547 #if defined(__KERNEL__) && defined(__linux__)
2549 struct cl_page *page;
2551 page = osc_oap2cl_page(oap);
2553 if (page->cp_type == CPT_CACHEABLE &&
2554 !(PageLocked(oap->oap_page) &&
2555 (CheckWriteback(oap->oap_page, cmd)))) {
2556 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2558 (long)oap->oap_page->flags,
2559 oap->oap_async_flags);
2565 /* take the page out of our book-keeping */
2566 cfs_list_del_init(&oap->oap_pending_item);
2567 lop_update_pending(cli, lop, cmd, -1);
2568 cfs_list_del_init(&oap->oap_urgent_item);
2570 if (page_count == 0)
2571 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2572 (PTLRPC_MAX_BRW_SIZE - 1);
2574 /* ask the caller for the size of the io as the rpc leaves. */
2575 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2577 ops->ap_refresh_count(env, oap->oap_caller_data,
2579 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2581 if (oap->oap_count <= 0) {
2582 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2584 osc_ap_completion(env, cli, NULL,
2585 oap, 0, oap->oap_count);
2589 /* now put the page back in our accounting */
2590 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2591 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2593 if (page_count == 0)
2594 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2595 if (++page_count >= cli->cl_max_pages_per_rpc)
2598 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2599 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2600 * have the same alignment as the initial writes that allocated
2601 * extents on the server. */
2602 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2603 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2604 if (ending_offset == 0)
2607 /* If there is a gap at the end of this page, it can't merge
2608 * with any subsequent pages, so we'll hand the network a
2609 * "fragmented" page array that it can't transfer in 1 RDMA */
2610 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2614 osc_wake_cache_waiters(cli);
2615 osc_wake_sync_fs(cli);
2616 loi_list_maint(cli, loi);
2618 client_obd_list_unlock(&cli->cl_loi_list_lock);
2621 cl_object_put(env, clob);
2623 if (page_count == 0) {
2624 client_obd_list_lock(&cli->cl_loi_list_lock);
2628 req = osc_build_req(env, cli, &rpc_list, page_count,
2629 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2631 LASSERT(cfs_list_empty(&rpc_list));
2632 loi_list_maint(cli, loi);
2633 RETURN(PTR_ERR(req));
2636 aa = ptlrpc_req_async_args(req);
2638 if (cmd == OBD_BRW_READ) {
2639 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2640 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2641 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2642 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2644 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2645 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2646 cli->cl_w_in_flight);
2647 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2648 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2650 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2652 client_obd_list_lock(&cli->cl_loi_list_lock);
2654 if (cmd == OBD_BRW_READ)
2655 cli->cl_r_in_flight++;
2657 cli->cl_w_in_flight++;
2659 /* queued sync pages can be torn down while the pages
2660 * were between the pending list and the rpc */
2662 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2663 /* only one oap gets a request reference */
2666 if (oap->oap_interrupted && !req->rq_intr) {
2667 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2669 ptlrpc_mark_interrupted(req);
2673 tmp->oap_request = ptlrpc_request_addref(req);
2675 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2676 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2678 req->rq_interpret_reply = brw_interpret;
2679 ptlrpcd_add_req(req, PSCOPE_BRW);
2683 #define LOI_DEBUG(LOI, STR, args...) \
2684 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2685 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2686 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2687 (LOI)->loi_write_lop.lop_num_pending, \
2688 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2689 (LOI)->loi_read_lop.lop_num_pending, \
2690 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2693 /* This is called by osc_check_rpcs() to find which objects have pages that
2694 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2695 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2699 /* First return objects that have blocked locks so that they
2700 * will be flushed quickly and other clients can get the lock,
2701 * then objects which have pages ready to be stuffed into RPCs */
2702 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2703 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2704 struct lov_oinfo, loi_hp_ready_item));
2705 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2706 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2707 struct lov_oinfo, loi_ready_item));
2708 if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
2709 RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
2710 struct lov_oinfo, loi_sync_fs_item));
2712 /* then if we have cache waiters, return all objects with queued
2713 * writes. This is especially important when many small files
2714 * have filled up the cache and not been fired into rpcs because
2715 * they don't pass the nr_pending/object threshhold */
2716 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2717 !cfs_list_empty(&cli->cl_loi_write_list))
2718 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2719 struct lov_oinfo, loi_write_item));
2721 /* then return all queued objects when we have an invalid import
2722 * so that they get flushed */
2723 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2724 if (!cfs_list_empty(&cli->cl_loi_write_list))
2725 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2728 if (!cfs_list_empty(&cli->cl_loi_read_list))
2729 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2730 struct lov_oinfo, loi_read_item));
2735 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2737 struct osc_async_page *oap;
2740 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2741 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2742 struct osc_async_page, oap_urgent_item);
2743 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2746 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2747 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2748 struct osc_async_page, oap_urgent_item);
2749 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2752 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2755 /* called with the loi list lock held */
2756 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2758 struct lov_oinfo *loi;
2759 int rc = 0, race_counter = 0;
2762 while ((loi = osc_next_loi(cli)) != NULL) {
2763 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2765 if (osc_max_rpc_in_flight(cli, loi))
2768 /* attempt some read/write balancing by alternating between
2769 * reads and writes in an object. The makes_rpc checks here
2770 * would be redundant if we were getting read/write work items
2771 * instead of objects. we don't want send_oap_rpc to drain a
2772 * partial read pending queue when we're given this object to
2773 * do io on writes while there are cache waiters */
2774 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2775 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2776 &loi->loi_write_lop);
2778 CERROR("Write request failed with %d\n", rc);
2780 /* osc_send_oap_rpc failed, mostly because of
2783 * It can't break here, because if:
2784 * - a page was submitted by osc_io_submit, so
2786 * - no request in flight
2787 * - no subsequent request
2788 * The system will be in live-lock state,
2789 * because there is no chance to call
2790 * osc_io_unplug() and osc_check_rpcs() any
2791 * more. pdflush can't help in this case,
2792 * because it might be blocked at grabbing
2793 * the page lock as we mentioned.
2795 * Anyway, continue to drain pages. */
2804 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2805 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2806 &loi->loi_read_lop);
2808 CERROR("Read request failed with %d\n", rc);
2816 /* attempt some inter-object balancing by issuing rpcs
2817 * for each object in turn */
2818 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2819 cfs_list_del_init(&loi->loi_hp_ready_item);
2820 if (!cfs_list_empty(&loi->loi_ready_item))
2821 cfs_list_del_init(&loi->loi_ready_item);
2822 if (!cfs_list_empty(&loi->loi_write_item))
2823 cfs_list_del_init(&loi->loi_write_item);
2824 if (!cfs_list_empty(&loi->loi_read_item))
2825 cfs_list_del_init(&loi->loi_read_item);
2826 if (!cfs_list_empty(&loi->loi_sync_fs_item))
2827 cfs_list_del_init(&loi->loi_sync_fs_item);
2829 loi_list_maint(cli, loi);
2831 /* send_oap_rpc fails with 0 when make_ready tells it to
2832 * back off. llite's make_ready does this when it tries
2833 * to lock a page queued for write that is already locked.
2834 * we want to try sending rpcs from many objects, but we
2835 * don't want to spin failing with 0. */
2836 if (race_counter == 10)
2842 /* we're trying to queue a page in the osc so we're subject to the
2843 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2844 * If the osc's queued pages are already at that limit, then we want to sleep
2845 * until there is space in the osc's queue for us. We also may be waiting for
2846 * write credits from the OST if there are RPCs in flight that may return some
2847 * before we fall back to sync writes.
2849 * We need this know our allocation was granted in the presence of signals */
2850 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2854 client_obd_list_lock(&cli->cl_loi_list_lock);
2855 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2856 client_obd_list_unlock(&cli->cl_loi_list_lock);
2861 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2864 int osc_enter_cache_try(const struct lu_env *env,
2865 struct client_obd *cli, struct lov_oinfo *loi,
2866 struct osc_async_page *oap, int transient)
2870 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2872 osc_consume_write_grant(cli, &oap->oap_brw_page);
2874 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2875 cfs_atomic_inc(&obd_dirty_transit_pages);
2876 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2882 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2883 * grant or cache space. */
2884 static int osc_enter_cache(const struct lu_env *env,
2885 struct client_obd *cli, struct lov_oinfo *loi,
2886 struct osc_async_page *oap)
2888 struct osc_cache_waiter ocw;
2889 struct l_wait_info lwi = { 0 };
2893 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2894 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2895 cli->cl_dirty_max, obd_max_dirty_pages,
2896 cli->cl_lost_grant, cli->cl_avail_grant);
2898 /* force the caller to try sync io. this can jump the list
2899 * of queued writes and create a discontiguous rpc stream */
2900 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2901 loi->loi_ar.ar_force_sync)
2904 /* Hopefully normal case - cache space and write credits available */
2905 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2906 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2907 osc_enter_cache_try(env, cli, loi, oap, 0))
2910 /* It is safe to block as a cache waiter as long as there is grant
2911 * space available or the hope of additional grant being returned
2912 * when an in flight write completes. Using the write back cache
2913 * if possible is preferable to sending the data synchronously
2914 * because write pages can then be merged in to large requests.
2915 * The addition of this cache waiter will causing pending write
2916 * pages to be sent immediately. */
2917 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2918 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2919 cfs_waitq_init(&ocw.ocw_waitq);
2923 loi_list_maint(cli, loi);
2924 osc_check_rpcs(env, cli);
2925 client_obd_list_unlock(&cli->cl_loi_list_lock);
2927 CDEBUG(D_CACHE, "sleeping for cache space\n");
2928 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2930 client_obd_list_lock(&cli->cl_loi_list_lock);
2931 if (!cfs_list_empty(&ocw.ocw_entry)) {
2932 cfs_list_del(&ocw.ocw_entry);
2942 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2943 struct lov_oinfo *loi, cfs_page_t *page,
2944 obd_off offset, const struct obd_async_page_ops *ops,
2945 void *data, void **res, int nocache,
2946 struct lustre_handle *lockh)
2948 struct osc_async_page *oap;
2953 return cfs_size_round(sizeof(*oap));
2956 oap->oap_magic = OAP_MAGIC;
2957 oap->oap_cli = &exp->exp_obd->u.cli;
2960 oap->oap_caller_ops = ops;
2961 oap->oap_caller_data = data;
2963 oap->oap_page = page;
2964 oap->oap_obj_off = offset;
2965 if (!client_is_remote(exp) &&
2966 cfs_capable(CFS_CAP_SYS_RESOURCE))
2967 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2969 LASSERT(!(offset & ~CFS_PAGE_MASK));
2971 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2972 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2973 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2974 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2976 cfs_spin_lock_init(&oap->oap_lock);
2977 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2981 struct osc_async_page *oap_from_cookie(void *cookie)
2983 struct osc_async_page *oap = cookie;
2984 if (oap->oap_magic != OAP_MAGIC)
2985 return ERR_PTR(-EINVAL);
2989 int osc_queue_async_io(const struct lu_env *env,
2990 struct obd_export *exp, struct lov_stripe_md *lsm,
2991 struct lov_oinfo *loi, void *cookie,
2992 int cmd, obd_off off, int count,
2993 obd_flag brw_flags, enum async_flags async_flags)
2995 struct client_obd *cli = &exp->exp_obd->u.cli;
2996 struct osc_async_page *oap;
3000 oap = oap_from_cookie(cookie);
3002 RETURN(PTR_ERR(oap));
3004 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3007 if (!cfs_list_empty(&oap->oap_pending_item) ||
3008 !cfs_list_empty(&oap->oap_urgent_item) ||
3009 !cfs_list_empty(&oap->oap_rpc_item))
3012 /* check if the file's owner/group is over quota */
3013 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3014 struct cl_object *obj;
3015 struct cl_attr attr; /* XXX put attr into thread info */
3016 unsigned int qid[MAXQUOTAS];
3018 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3020 cl_object_attr_lock(obj);
3021 rc = cl_object_attr_get(env, obj, &attr);
3022 cl_object_attr_unlock(obj);
3024 qid[USRQUOTA] = attr.cat_uid;
3025 qid[GRPQUOTA] = attr.cat_gid;
3027 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3034 loi = lsm->lsm_oinfo[0];
3036 client_obd_list_lock(&cli->cl_loi_list_lock);
3038 LASSERT(off + count <= CFS_PAGE_SIZE);
3040 oap->oap_page_off = off;
3041 oap->oap_count = count;
3042 oap->oap_brw_flags = brw_flags;
3043 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3044 if (cfs_memory_pressure_get())
3045 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3046 cfs_spin_lock(&oap->oap_lock);
3047 oap->oap_async_flags = async_flags;
3048 cfs_spin_unlock(&oap->oap_lock);
3050 if (cmd & OBD_BRW_WRITE) {
3051 rc = osc_enter_cache(env, cli, loi, oap);
3053 client_obd_list_unlock(&cli->cl_loi_list_lock);
3058 osc_oap_to_pending(oap);
3059 loi_list_maint(cli, loi);
3061 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3064 osc_check_rpcs(env, cli);
3065 client_obd_list_unlock(&cli->cl_loi_list_lock);
3070 /* aka (~was & now & flag), but this is more clear :) */
3071 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3073 int osc_set_async_flags_base(struct client_obd *cli,
3074 struct lov_oinfo *loi, struct osc_async_page *oap,
3075 obd_flag async_flags)
3077 struct loi_oap_pages *lop;
3081 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3083 if (oap->oap_cmd & OBD_BRW_WRITE) {
3084 lop = &loi->loi_write_lop;
3086 lop = &loi->loi_read_lop;
3089 if ((oap->oap_async_flags & async_flags) == async_flags)
3092 /* XXX: This introduces a tiny insignificant race for the case if this
3093 * loi already had other urgent items.
3095 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
3096 cfs_list_empty(&oap->oap_rpc_item) &&
3097 cfs_list_empty(&oap->oap_urgent_item)) {
3098 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3099 flags |= ASYNC_SYNCFS;
3100 cfs_spin_lock(&oap->oap_lock);
3101 oap->oap_async_flags |= flags;
3102 cfs_spin_unlock(&oap->oap_lock);
3103 loi_list_maint(cli, loi);
3107 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3108 flags |= ASYNC_READY;
3110 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3111 cfs_list_empty(&oap->oap_rpc_item)) {
3112 if (oap->oap_async_flags & ASYNC_HP)
3113 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3115 cfs_list_add_tail(&oap->oap_urgent_item,
3117 flags |= ASYNC_URGENT;
3118 loi_list_maint(cli, loi);
3120 cfs_spin_lock(&oap->oap_lock);
3121 oap->oap_async_flags |= flags;
3122 cfs_spin_unlock(&oap->oap_lock);
3124 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3125 oap->oap_async_flags);
3129 int osc_teardown_async_page(struct obd_export *exp,
3130 struct lov_stripe_md *lsm,
3131 struct lov_oinfo *loi, void *cookie)
3133 struct client_obd *cli = &exp->exp_obd->u.cli;
3134 struct loi_oap_pages *lop;
3135 struct osc_async_page *oap;
3139 oap = oap_from_cookie(cookie);
3141 RETURN(PTR_ERR(oap));
3144 loi = lsm->lsm_oinfo[0];
3146 if (oap->oap_cmd & OBD_BRW_WRITE) {
3147 lop = &loi->loi_write_lop;
3149 lop = &loi->loi_read_lop;
3152 client_obd_list_lock(&cli->cl_loi_list_lock);
3154 if (!cfs_list_empty(&oap->oap_rpc_item))
3155 GOTO(out, rc = -EBUSY);
3157 osc_exit_cache(cli, oap, 0);
3158 osc_wake_cache_waiters(cli);
3160 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3161 cfs_list_del_init(&oap->oap_urgent_item);
3162 cfs_spin_lock(&oap->oap_lock);
3163 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
3165 cfs_spin_unlock(&oap->oap_lock);
3167 if (!cfs_list_empty(&oap->oap_pending_item)) {
3168 cfs_list_del_init(&oap->oap_pending_item);
3169 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3171 loi_list_maint(cli, loi);
3172 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3174 client_obd_list_unlock(&cli->cl_loi_list_lock);
3178 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3179 struct ldlm_enqueue_info *einfo,
3182 void *data = einfo->ei_cbdata;
3184 LASSERT(lock != NULL);
3185 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3186 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3187 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3188 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3190 lock_res_and_lock(lock);
3191 cfs_spin_lock(&osc_ast_guard);
3192 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3193 lock->l_ast_data = data;
3194 cfs_spin_unlock(&osc_ast_guard);
3195 unlock_res_and_lock(lock);
3198 static void osc_set_data_with_check(struct lustre_handle *lockh,
3199 struct ldlm_enqueue_info *einfo,
3202 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3205 osc_set_lock_data_with_check(lock, einfo, flags);
3206 LDLM_LOCK_PUT(lock);
3208 CERROR("lockh %p, data %p - client evicted?\n",
3209 lockh, einfo->ei_cbdata);
3212 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3213 ldlm_iterator_t replace, void *data)
3215 struct ldlm_res_id res_id;
3216 struct obd_device *obd = class_exp2obd(exp);
3218 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3219 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3223 /* find any ldlm lock of the inode in osc
3227 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3228 ldlm_iterator_t replace, void *data)
3230 struct ldlm_res_id res_id;
3231 struct obd_device *obd = class_exp2obd(exp);
3234 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3235 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3236 if (rc == LDLM_ITER_STOP)
3238 if (rc == LDLM_ITER_CONTINUE)
3243 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3244 obd_enqueue_update_f upcall, void *cookie,
3247 int intent = *flags & LDLM_FL_HAS_INTENT;
3251 /* The request was created before ldlm_cli_enqueue call. */
3252 if (rc == ELDLM_LOCK_ABORTED) {
3253 struct ldlm_reply *rep;
3254 rep = req_capsule_server_get(&req->rq_pill,
3257 LASSERT(rep != NULL);
3258 if (rep->lock_policy_res1)
3259 rc = rep->lock_policy_res1;
3263 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3264 *flags |= LDLM_FL_LVB_READY;
3265 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3266 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3269 /* Call the update callback. */
3270 rc = (*upcall)(cookie, rc);
3274 static int osc_enqueue_interpret(const struct lu_env *env,
3275 struct ptlrpc_request *req,
3276 struct osc_enqueue_args *aa, int rc)
3278 struct ldlm_lock *lock;
3279 struct lustre_handle handle;
3282 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3283 * might be freed anytime after lock upcall has been called. */
3284 lustre_handle_copy(&handle, aa->oa_lockh);
3285 mode = aa->oa_ei->ei_mode;
3287 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3289 lock = ldlm_handle2lock(&handle);
3291 /* Take an additional reference so that a blocking AST that
3292 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3293 * to arrive after an upcall has been executed by
3294 * osc_enqueue_fini(). */
3295 ldlm_lock_addref(&handle, mode);
3297 /* Let CP AST to grant the lock first. */
3298 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3300 /* Complete obtaining the lock procedure. */
3301 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3302 mode, aa->oa_flags, aa->oa_lvb,
3303 sizeof(*aa->oa_lvb), &handle, rc);
3304 /* Complete osc stuff. */
3305 rc = osc_enqueue_fini(req, aa->oa_lvb,
3306 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3308 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3310 /* Release the lock for async request. */
3311 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3313 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3314 * not already released by
3315 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3317 ldlm_lock_decref(&handle, mode);
3319 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3320 aa->oa_lockh, req, aa);
3321 ldlm_lock_decref(&handle, mode);
3322 LDLM_LOCK_PUT(lock);
3326 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3327 struct lov_oinfo *loi, int flags,
3328 struct ost_lvb *lvb, __u32 mode, int rc)
3330 if (rc == ELDLM_OK) {
3331 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3334 LASSERT(lock != NULL);
3335 loi->loi_lvb = *lvb;
3336 tmp = loi->loi_lvb.lvb_size;
3337 /* Extend KMS up to the end of this lock and no further
3338 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3339 if (tmp > lock->l_policy_data.l_extent.end)
3340 tmp = lock->l_policy_data.l_extent.end + 1;
3341 if (tmp >= loi->loi_kms) {
3342 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3343 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3344 loi_kms_set(loi, tmp);
3346 LDLM_DEBUG(lock, "lock acquired, setting rss="
3347 LPU64"; leaving kms="LPU64", end="LPU64,
3348 loi->loi_lvb.lvb_size, loi->loi_kms,
3349 lock->l_policy_data.l_extent.end);
3351 ldlm_lock_allow_match(lock);
3352 LDLM_LOCK_PUT(lock);
3353 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3354 loi->loi_lvb = *lvb;
3355 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3356 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3360 EXPORT_SYMBOL(osc_update_enqueue);
3362 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3364 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3365 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3366 * other synchronous requests, however keeping some locks and trying to obtain
3367 * others may take a considerable amount of time in a case of ost failure; and
3368 * when other sync requests do not get released lock from a client, the client
3369 * is excluded from the cluster -- such scenarious make the life difficult, so
3370 * release locks just after they are obtained. */
3371 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3372 int *flags, ldlm_policy_data_t *policy,
3373 struct ost_lvb *lvb, int kms_valid,
3374 obd_enqueue_update_f upcall, void *cookie,
3375 struct ldlm_enqueue_info *einfo,
3376 struct lustre_handle *lockh,
3377 struct ptlrpc_request_set *rqset, int async)
3379 struct obd_device *obd = exp->exp_obd;
3380 struct ptlrpc_request *req = NULL;
3381 int intent = *flags & LDLM_FL_HAS_INTENT;
3386 /* Filesystem lock extents are extended to page boundaries so that
3387 * dealing with the page cache is a little smoother. */
3388 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3389 policy->l_extent.end |= ~CFS_PAGE_MASK;
3392 * kms is not valid when either object is completely fresh (so that no
3393 * locks are cached), or object was evicted. In the latter case cached
3394 * lock cannot be used, because it would prime inode state with
3395 * potentially stale LVB.
3400 /* Next, search for already existing extent locks that will cover us */
3401 /* If we're trying to read, we also search for an existing PW lock. The
3402 * VFS and page cache already protect us locally, so lots of readers/
3403 * writers can share a single PW lock.
3405 * There are problems with conversion deadlocks, so instead of
3406 * converting a read lock to a write lock, we'll just enqueue a new
3409 * At some point we should cancel the read lock instead of making them
3410 * send us a blocking callback, but there are problems with canceling
3411 * locks out from other users right now, too. */
3412 mode = einfo->ei_mode;
3413 if (einfo->ei_mode == LCK_PR)
3415 mode = ldlm_lock_match(obd->obd_namespace,
3416 *flags | LDLM_FL_LVB_READY, res_id,
3417 einfo->ei_type, policy, mode, lockh, 0);
3419 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3421 if (matched->l_ast_data == NULL ||
3422 matched->l_ast_data == einfo->ei_cbdata) {
3423 /* addref the lock only if not async requests and PW
3424 * lock is matched whereas we asked for PR. */
3425 if (!rqset && einfo->ei_mode != mode)
3426 ldlm_lock_addref(lockh, LCK_PR);
3427 osc_set_lock_data_with_check(matched, einfo, *flags);
3429 /* I would like to be able to ASSERT here that
3430 * rss <= kms, but I can't, for reasons which
3431 * are explained in lov_enqueue() */
3434 /* We already have a lock, and it's referenced */
3435 (*upcall)(cookie, ELDLM_OK);
3437 /* For async requests, decref the lock. */
3438 if (einfo->ei_mode != mode)
3439 ldlm_lock_decref(lockh, LCK_PW);
3441 ldlm_lock_decref(lockh, einfo->ei_mode);
3442 LDLM_LOCK_PUT(matched);
3445 ldlm_lock_decref(lockh, mode);
3446 LDLM_LOCK_PUT(matched);
3451 CFS_LIST_HEAD(cancels);
3452 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3453 &RQF_LDLM_ENQUEUE_LVB);
3457 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3459 ptlrpc_request_free(req);
3463 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3465 ptlrpc_request_set_replen(req);
3468 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3469 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3471 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3472 sizeof(*lvb), lockh, async);
3475 struct osc_enqueue_args *aa;
3476 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3477 aa = ptlrpc_req_async_args(req);
3480 aa->oa_flags = flags;
3481 aa->oa_upcall = upcall;
3482 aa->oa_cookie = cookie;
3484 aa->oa_lockh = lockh;
3486 req->rq_interpret_reply =
3487 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3488 if (rqset == PTLRPCD_SET)
3489 ptlrpcd_add_req(req, PSCOPE_OTHER);
3491 ptlrpc_set_add_req(rqset, req);
3492 } else if (intent) {
3493 ptlrpc_req_finished(req);
3498 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3500 ptlrpc_req_finished(req);
3505 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3506 struct ldlm_enqueue_info *einfo,
3507 struct ptlrpc_request_set *rqset)
3509 struct ldlm_res_id res_id;
3513 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3514 oinfo->oi_md->lsm_object_seq, &res_id);
3516 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3517 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3518 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3519 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3520 rqset, rqset != NULL);
3524 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3525 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3526 int *flags, void *data, struct lustre_handle *lockh,
3529 struct obd_device *obd = exp->exp_obd;
3530 int lflags = *flags;
3534 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3537 /* Filesystem lock extents are extended to page boundaries so that
3538 * dealing with the page cache is a little smoother */
3539 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3540 policy->l_extent.end |= ~CFS_PAGE_MASK;
3542 /* Next, search for already existing extent locks that will cover us */
3543 /* If we're trying to read, we also search for an existing PW lock. The
3544 * VFS and page cache already protect us locally, so lots of readers/
3545 * writers can share a single PW lock. */
3549 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3550 res_id, type, policy, rc, lockh, unref);
3553 osc_set_data_with_check(lockh, data, lflags);
3554 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3555 ldlm_lock_addref(lockh, LCK_PR);
3556 ldlm_lock_decref(lockh, LCK_PW);
3563 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3567 if (unlikely(mode == LCK_GROUP))
3568 ldlm_lock_decref_and_cancel(lockh, mode);
3570 ldlm_lock_decref(lockh, mode);
3575 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3576 __u32 mode, struct lustre_handle *lockh)
3579 RETURN(osc_cancel_base(lockh, mode));
3582 static int osc_cancel_unused(struct obd_export *exp,
3583 struct lov_stripe_md *lsm,
3584 ldlm_cancel_flags_t flags,
3587 struct obd_device *obd = class_exp2obd(exp);
3588 struct ldlm_res_id res_id, *resp = NULL;
3591 resp = osc_build_res_name(lsm->lsm_object_id,
3592 lsm->lsm_object_seq, &res_id);
3595 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3598 static int osc_statfs_interpret(const struct lu_env *env,
3599 struct ptlrpc_request *req,
3600 struct osc_async_args *aa, int rc)
3602 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3603 struct obd_statfs *msfs;
3608 /* The request has in fact never been sent
3609 * due to issues at a higher level (LOV).
3610 * Exit immediately since the caller is
3611 * aware of the problem and takes care
3612 * of the clean up */
3615 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3616 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3622 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3624 GOTO(out, rc = -EPROTO);
3627 /* Reinitialize the RDONLY and DEGRADED flags at the client
3628 * on each statfs, so they don't stay set permanently. */
3629 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3631 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3632 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3633 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3634 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3636 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3637 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3638 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3639 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3641 /* Add a bit of hysteresis so this flag isn't continually flapping,
3642 * and ensure that new files don't get extremely fragmented due to
3643 * only a small amount of available space in the filesystem.
3644 * We want to set the NOSPC flag when there is less than ~0.1% free
3645 * and clear it when there is at least ~0.2% free space, so:
3646 * avail < ~0.1% max max = avail + used
3647 * 1025 * avail < avail + used used = blocks - free
3648 * 1024 * avail < used
3649 * 1024 * avail < blocks - free
3650 * avail < ((blocks - free) >> 10)
3652 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3653 * lose that amount of space so in those cases we report no space left
3654 * if their is less than 1 GB left. */
3655 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3656 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3657 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3658 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3659 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3660 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3661 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3663 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3665 *aa->aa_oi->oi_osfs = *msfs;
3667 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3671 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3672 __u64 max_age, struct ptlrpc_request_set *rqset)
3674 struct ptlrpc_request *req;
3675 struct osc_async_args *aa;
3679 /* We could possibly pass max_age in the request (as an absolute
3680 * timestamp or a "seconds.usec ago") so the target can avoid doing
3681 * extra calls into the filesystem if that isn't necessary (e.g.
3682 * during mount that would help a bit). Having relative timestamps
3683 * is not so great if request processing is slow, while absolute
3684 * timestamps are not ideal because they need time synchronization. */
3685 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3689 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3691 ptlrpc_request_free(req);
3694 ptlrpc_request_set_replen(req);
3695 req->rq_request_portal = OST_CREATE_PORTAL;
3696 ptlrpc_at_set_req_timeout(req);
3698 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3699 /* procfs requests not want stat in wait for avoid deadlock */
3700 req->rq_no_resend = 1;
3701 req->rq_no_delay = 1;
3704 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3705 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3706 aa = ptlrpc_req_async_args(req);
3709 ptlrpc_set_add_req(rqset, req);
3713 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3714 __u64 max_age, __u32 flags)
3716 struct obd_statfs *msfs;
3717 struct ptlrpc_request *req;
3718 struct obd_import *imp = NULL;
3722 /*Since the request might also come from lprocfs, so we need
3723 *sync this with client_disconnect_export Bug15684*/
3724 cfs_down_read(&obd->u.cli.cl_sem);
3725 if (obd->u.cli.cl_import)
3726 imp = class_import_get(obd->u.cli.cl_import);
3727 cfs_up_read(&obd->u.cli.cl_sem);
3731 /* We could possibly pass max_age in the request (as an absolute
3732 * timestamp or a "seconds.usec ago") so the target can avoid doing
3733 * extra calls into the filesystem if that isn't necessary (e.g.
3734 * during mount that would help a bit). Having relative timestamps
3735 * is not so great if request processing is slow, while absolute
3736 * timestamps are not ideal because they need time synchronization. */
3737 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3739 class_import_put(imp);
3744 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3746 ptlrpc_request_free(req);
3749 ptlrpc_request_set_replen(req);
3750 req->rq_request_portal = OST_CREATE_PORTAL;
3751 ptlrpc_at_set_req_timeout(req);
3753 if (flags & OBD_STATFS_NODELAY) {
3754 /* procfs requests not want stat in wait for avoid deadlock */
3755 req->rq_no_resend = 1;
3756 req->rq_no_delay = 1;
3759 rc = ptlrpc_queue_wait(req);
3763 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3765 GOTO(out, rc = -EPROTO);
3772 ptlrpc_req_finished(req);
3776 /* Retrieve object striping information.
3778 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3779 * the maximum number of OST indices which will fit in the user buffer.
3780 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3782 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3784 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3785 struct lov_user_md_v3 lum, *lumk;
3786 struct lov_user_ost_data_v1 *lmm_objects;
3787 int rc = 0, lum_size;
3793 /* we only need the header part from user space to get lmm_magic and
3794 * lmm_stripe_count, (the header part is common to v1 and v3) */
3795 lum_size = sizeof(struct lov_user_md_v1);
3796 if (cfs_copy_from_user(&lum, lump, lum_size))
3799 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3800 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3803 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3804 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3805 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3806 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3808 /* we can use lov_mds_md_size() to compute lum_size
3809 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3810 if (lum.lmm_stripe_count > 0) {
3811 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3812 OBD_ALLOC(lumk, lum_size);
3816 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3817 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3819 lmm_objects = &(lumk->lmm_objects[0]);
3820 lmm_objects->l_object_id = lsm->lsm_object_id;
3822 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3826 lumk->lmm_object_id = lsm->lsm_object_id;
3827 lumk->lmm_object_seq = lsm->lsm_object_seq;
3828 lumk->lmm_stripe_count = 1;
3830 if (cfs_copy_to_user(lump, lumk, lum_size))
3834 OBD_FREE(lumk, lum_size);
3840 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3841 void *karg, void *uarg)
3843 struct obd_device *obd = exp->exp_obd;
3844 struct obd_ioctl_data *data = karg;
3848 if (!cfs_try_module_get(THIS_MODULE)) {
3849 CERROR("Can't get module. Is it alive?");
3853 case OBD_IOC_LOV_GET_CONFIG: {
3855 struct lov_desc *desc;
3856 struct obd_uuid uuid;
3860 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3861 GOTO(out, err = -EINVAL);
3863 data = (struct obd_ioctl_data *)buf;
3865 if (sizeof(*desc) > data->ioc_inllen1) {
3866 obd_ioctl_freedata(buf, len);
3867 GOTO(out, err = -EINVAL);
3870 if (data->ioc_inllen2 < sizeof(uuid)) {
3871 obd_ioctl_freedata(buf, len);
3872 GOTO(out, err = -EINVAL);
3875 desc = (struct lov_desc *)data->ioc_inlbuf1;
3876 desc->ld_tgt_count = 1;
3877 desc->ld_active_tgt_count = 1;
3878 desc->ld_default_stripe_count = 1;
3879 desc->ld_default_stripe_size = 0;
3880 desc->ld_default_stripe_offset = 0;
3881 desc->ld_pattern = 0;
3882 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3884 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3886 err = cfs_copy_to_user((void *)uarg, buf, len);
3889 obd_ioctl_freedata(buf, len);
3892 case LL_IOC_LOV_SETSTRIPE:
3893 err = obd_alloc_memmd(exp, karg);
3897 case LL_IOC_LOV_GETSTRIPE:
3898 err = osc_getstripe(karg, uarg);
3900 case OBD_IOC_CLIENT_RECOVER:
3901 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3906 case IOC_OSC_SET_ACTIVE:
3907 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3910 case OBD_IOC_POLL_QUOTACHECK:
3911 err = lquota_poll_check(quota_interface, exp,
3912 (struct if_quotacheck *)karg);
3914 case OBD_IOC_PING_TARGET:
3915 err = ptlrpc_obd_ping(obd);
3918 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3919 cmd, cfs_curproc_comm());
3920 GOTO(out, err = -ENOTTY);
3923 cfs_module_put(THIS_MODULE);
3927 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3928 void *key, __u32 *vallen, void *val,
3929 struct lov_stripe_md *lsm)
3932 if (!vallen || !val)
3935 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3936 __u32 *stripe = val;
3937 *vallen = sizeof(*stripe);
3940 } else if (KEY_IS(KEY_LAST_ID)) {
3941 struct ptlrpc_request *req;
3946 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3947 &RQF_OST_GET_INFO_LAST_ID);
3951 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3952 RCL_CLIENT, keylen);
3953 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3955 ptlrpc_request_free(req);
3959 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3960 memcpy(tmp, key, keylen);
3962 req->rq_no_delay = req->rq_no_resend = 1;
3963 ptlrpc_request_set_replen(req);
3964 rc = ptlrpc_queue_wait(req);
3968 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3970 GOTO(out, rc = -EPROTO);
3972 *((obd_id *)val) = *reply;
3974 ptlrpc_req_finished(req);
3976 } else if (KEY_IS(KEY_FIEMAP)) {
3977 struct ptlrpc_request *req;
3978 struct ll_user_fiemap *reply;
3982 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3983 &RQF_OST_GET_INFO_FIEMAP);
3987 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3988 RCL_CLIENT, keylen);
3989 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3990 RCL_CLIENT, *vallen);
3991 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3992 RCL_SERVER, *vallen);
3994 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3996 ptlrpc_request_free(req);
4000 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4001 memcpy(tmp, key, keylen);
4002 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4003 memcpy(tmp, val, *vallen);
4005 ptlrpc_request_set_replen(req);
4006 rc = ptlrpc_queue_wait(req);
4010 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4012 GOTO(out1, rc = -EPROTO);
4014 memcpy(val, reply, *vallen);
4016 ptlrpc_req_finished(req);
4024 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4026 struct llog_ctxt *ctxt;
4030 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4032 rc = llog_initiator_connect(ctxt);
4033 llog_ctxt_put(ctxt);
4035 /* XXX return an error? skip setting below flags? */
4038 cfs_spin_lock(&imp->imp_lock);
4039 imp->imp_server_timeout = 1;
4040 imp->imp_pingable = 1;
4041 cfs_spin_unlock(&imp->imp_lock);
4042 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4047 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4048 struct ptlrpc_request *req,
4055 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4058 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4059 void *key, obd_count vallen, void *val,
4060 struct ptlrpc_request_set *set)
4062 struct ptlrpc_request *req;
4063 struct obd_device *obd = exp->exp_obd;
4064 struct obd_import *imp = class_exp2cliimp(exp);
4069 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4071 if (KEY_IS(KEY_NEXT_ID)) {
4073 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4075 if (vallen != sizeof(obd_id))
4080 if (vallen != sizeof(obd_id))
4083 /* avoid race between allocate new object and set next id
4084 * from ll_sync thread */
4085 cfs_spin_lock(&oscc->oscc_lock);
4086 new_val = *((obd_id*)val) + 1;
4087 if (new_val > oscc->oscc_next_id)
4088 oscc->oscc_next_id = new_val;
4089 cfs_spin_unlock(&oscc->oscc_lock);
4090 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4091 exp->exp_obd->obd_name,
4092 obd->u.cli.cl_oscc.oscc_next_id);
4097 if (KEY_IS(KEY_CHECKSUM)) {
4098 if (vallen != sizeof(int))
4100 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4104 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4105 sptlrpc_conf_client_adapt(obd);
4109 if (KEY_IS(KEY_FLUSH_CTX)) {
4110 sptlrpc_import_flush_my_ctx(imp);
4114 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4117 /* We pass all other commands directly to OST. Since nobody calls osc
4118 methods directly and everybody is supposed to go through LOV, we
4119 assume lov checked invalid values for us.
4120 The only recognised values so far are evict_by_nid and mds_conn.
4121 Even if something bad goes through, we'd get a -EINVAL from OST
4124 if (KEY_IS(KEY_GRANT_SHRINK))
4125 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4127 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4132 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4133 RCL_CLIENT, keylen);
4134 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4135 RCL_CLIENT, vallen);
4136 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4138 ptlrpc_request_free(req);
4142 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4143 memcpy(tmp, key, keylen);
4144 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4145 memcpy(tmp, val, vallen);
4147 if (KEY_IS(KEY_MDS_CONN)) {
4148 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4150 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4151 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4152 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4153 req->rq_no_delay = req->rq_no_resend = 1;
4154 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4155 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4156 struct osc_grant_args *aa;
4159 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4160 aa = ptlrpc_req_async_args(req);
4163 ptlrpc_req_finished(req);
4166 *oa = ((struct ost_body *)val)->oa;
4168 req->rq_interpret_reply = osc_shrink_grant_interpret;
4171 ptlrpc_request_set_replen(req);
4172 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4173 LASSERT(set != NULL);
4174 ptlrpc_set_add_req(set, req);
4175 ptlrpc_check_set(NULL, set);
4177 ptlrpcd_add_req(req, PSCOPE_OTHER);
4183 static struct llog_operations osc_size_repl_logops = {
4184 lop_cancel: llog_obd_repl_cancel
4187 static struct llog_operations osc_mds_ost_orig_logops;
4189 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4190 struct obd_device *tgt, struct llog_catid *catid)
4195 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4196 &catid->lci_logid, &osc_mds_ost_orig_logops);
4198 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4202 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4203 NULL, &osc_size_repl_logops);
4205 struct llog_ctxt *ctxt =
4206 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4209 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4214 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4215 obd->obd_name, tgt->obd_name, catid, rc);
4216 CERROR("logid "LPX64":0x%x\n",
4217 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4222 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4223 struct obd_device *disk_obd, int *index)
4225 struct llog_catid catid;
4226 static char name[32] = CATLIST;
4230 LASSERT(olg == &obd->obd_olg);
4232 cfs_mutex_down(&olg->olg_cat_processing);
4233 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4235 CERROR("rc: %d\n", rc);
4239 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4240 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4241 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4243 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4245 CERROR("rc: %d\n", rc);
4249 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4251 CERROR("rc: %d\n", rc);
4256 cfs_mutex_up(&olg->olg_cat_processing);
4261 static int osc_llog_finish(struct obd_device *obd, int count)
4263 struct llog_ctxt *ctxt;
4264 int rc = 0, rc2 = 0;
4267 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4269 rc = llog_cleanup(ctxt);
4271 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4273 rc2 = llog_cleanup(ctxt);
4280 static int osc_reconnect(const struct lu_env *env,
4281 struct obd_export *exp, struct obd_device *obd,
4282 struct obd_uuid *cluuid,
4283 struct obd_connect_data *data,
4286 struct client_obd *cli = &obd->u.cli;
4288 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4291 client_obd_list_lock(&cli->cl_loi_list_lock);
4292 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4293 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4294 lost_grant = cli->cl_lost_grant;
4295 cli->cl_lost_grant = 0;
4296 client_obd_list_unlock(&cli->cl_loi_list_lock);
4298 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4299 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4300 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4301 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4302 " ocd_grant: %d\n", data->ocd_connect_flags,
4303 data->ocd_version, data->ocd_grant);
4309 static int osc_disconnect(struct obd_export *exp)
4311 struct obd_device *obd = class_exp2obd(exp);
4312 struct llog_ctxt *ctxt;
4315 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4317 if (obd->u.cli.cl_conn_count == 1) {
4318 /* Flush any remaining cancel messages out to the
4320 llog_sync(ctxt, exp);
4322 llog_ctxt_put(ctxt);
4324 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4328 rc = client_disconnect_export(exp);
4330 * Initially we put del_shrink_grant before disconnect_export, but it
4331 * causes the following problem if setup (connect) and cleanup
4332 * (disconnect) are tangled together.
4333 * connect p1 disconnect p2
4334 * ptlrpc_connect_import
4335 * ............... class_manual_cleanup
4338 * ptlrpc_connect_interrupt
4340 * add this client to shrink list
4342 * Bang! pinger trigger the shrink.
4343 * So the osc should be disconnected from the shrink list, after we
4344 * are sure the import has been destroyed. BUG18662
4346 if (obd->u.cli.cl_import == NULL)
4347 osc_del_shrink_grant(&obd->u.cli);
4351 static int osc_import_event(struct obd_device *obd,
4352 struct obd_import *imp,
4353 enum obd_import_event event)
4355 struct client_obd *cli;
4359 LASSERT(imp->imp_obd == obd);
4362 case IMP_EVENT_DISCON: {
4363 /* Only do this on the MDS OSC's */
4364 if (imp->imp_server_timeout) {
4365 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4367 cfs_spin_lock(&oscc->oscc_lock);
4368 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4369 cfs_spin_unlock(&oscc->oscc_lock);
4372 client_obd_list_lock(&cli->cl_loi_list_lock);
4373 cli->cl_avail_grant = 0;
4374 cli->cl_lost_grant = 0;
4375 client_obd_list_unlock(&cli->cl_loi_list_lock);
4378 case IMP_EVENT_INACTIVE: {
4379 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4382 case IMP_EVENT_INVALIDATE: {
4383 struct ldlm_namespace *ns = obd->obd_namespace;
4387 env = cl_env_get(&refcheck);
4391 client_obd_list_lock(&cli->cl_loi_list_lock);
4392 /* all pages go to failing rpcs due to the invalid
4394 osc_check_rpcs(env, cli);
4395 client_obd_list_unlock(&cli->cl_loi_list_lock);
4397 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4398 cl_env_put(env, &refcheck);
4403 case IMP_EVENT_ACTIVE: {
4404 /* Only do this on the MDS OSC's */
4405 if (imp->imp_server_timeout) {
4406 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4408 cfs_spin_lock(&oscc->oscc_lock);
4409 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4410 cfs_spin_unlock(&oscc->oscc_lock);
4412 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4415 case IMP_EVENT_OCD: {
4416 struct obd_connect_data *ocd = &imp->imp_connect_data;
4418 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4419 osc_init_grant(&obd->u.cli, ocd);
4422 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4423 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4425 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4429 CERROR("Unknown import event %d\n", event);
4436 * Determine whether the lock can be canceled before replaying the lock
4437 * during recovery, see bug16774 for detailed information.
4439 * \retval zero the lock can't be canceled
4440 * \retval other ok to cancel
4442 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4444 check_res_locked(lock->l_resource);
4447 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4449 * XXX as a future improvement, we can also cancel unused write lock
4450 * if it doesn't have dirty data and active mmaps.
4452 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4453 (lock->l_granted_mode == LCK_PR ||
4454 lock->l_granted_mode == LCK_CR) &&
4455 (osc_dlm_lock_pageref(lock) == 0))
4461 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4467 rc = ptlrpcd_addref();
4471 rc = client_obd_setup(obd, lcfg);
4475 struct lprocfs_static_vars lvars = { 0 };
4476 struct client_obd *cli = &obd->u.cli;
4478 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4479 lprocfs_osc_init_vars(&lvars);
4480 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4481 lproc_osc_attach_seqstat(obd);
4482 sptlrpc_lprocfs_cliobd_attach(obd);
4483 ptlrpc_lprocfs_register_obd(obd);
4487 /* We need to allocate a few requests more, because
4488 brw_interpret tries to create new requests before freeing
4489 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4490 reserved, but I afraid that might be too much wasted RAM
4491 in fact, so 2 is just my guess and still should work. */
4492 cli->cl_import->imp_rq_pool =
4493 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4495 ptlrpc_add_rqs_to_pool);
4497 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4498 cfs_sema_init(&cli->cl_grant_sem, 1);
4500 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4506 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4512 case OBD_CLEANUP_EARLY: {
4513 struct obd_import *imp;
4514 imp = obd->u.cli.cl_import;
4515 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4516 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4517 ptlrpc_deactivate_import(imp);
4518 cfs_spin_lock(&imp->imp_lock);
4519 imp->imp_pingable = 0;
4520 cfs_spin_unlock(&imp->imp_lock);
4523 case OBD_CLEANUP_EXPORTS: {
4524 /* If we set up but never connected, the
4525 client import will not have been cleaned. */
4526 if (obd->u.cli.cl_import) {
4527 struct obd_import *imp;
4528 cfs_down_write(&obd->u.cli.cl_sem);
4529 imp = obd->u.cli.cl_import;
4530 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4532 ptlrpc_invalidate_import(imp);
4533 if (imp->imp_rq_pool) {
4534 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4535 imp->imp_rq_pool = NULL;
4537 class_destroy_import(imp);
4538 cfs_up_write(&obd->u.cli.cl_sem);
4539 obd->u.cli.cl_import = NULL;
4541 rc = obd_llog_finish(obd, 0);
4543 CERROR("failed to cleanup llogging subsystems\n");
4550 int osc_cleanup(struct obd_device *obd)
4555 ptlrpc_lprocfs_unregister_obd(obd);
4556 lprocfs_obd_cleanup(obd);
4558 /* free memory of osc quota cache */
4559 lquota_cleanup(quota_interface, obd);
4561 rc = client_obd_cleanup(obd);
4567 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4569 struct lprocfs_static_vars lvars = { 0 };
4572 lprocfs_osc_init_vars(&lvars);
4574 switch (lcfg->lcfg_command) {
4576 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4586 static int osc_sync_fs(struct obd_device *obd, struct obd_info *oinfo,
4589 struct client_obd *cli;
4590 struct lov_oinfo *loi;
4591 struct lov_oinfo *tloi;
4592 struct osc_async_page *oap;
4593 struct osc_async_page *toap;
4594 struct loi_oap_pages *lop;
4600 env = cl_env_get(&refcheck);
4602 RETURN(PTR_ERR(env));
4605 client_obd_list_lock(&cli->cl_loi_list_lock);
4606 cli->cl_sf_wait.sfw_oi = oinfo;
4607 cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
4608 cli->cl_sf_wait.started = 1;
4609 /* creating cl_loi_sync_fs list */
4610 cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
4612 lop = &loi->loi_write_lop;
4613 cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
4615 osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
4618 osc_check_rpcs(env, cli);
4619 osc_wake_sync_fs(cli);
4620 client_obd_list_unlock(&cli->cl_loi_list_lock);
4621 cl_env_put(env, &refcheck);
4625 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4627 return osc_process_config_base(obd, buf);
4630 struct obd_ops osc_obd_ops = {
4631 .o_owner = THIS_MODULE,
4632 .o_setup = osc_setup,
4633 .o_precleanup = osc_precleanup,
4634 .o_cleanup = osc_cleanup,
4635 .o_add_conn = client_import_add_conn,
4636 .o_del_conn = client_import_del_conn,
4637 .o_connect = client_connect_import,
4638 .o_reconnect = osc_reconnect,
4639 .o_disconnect = osc_disconnect,
4640 .o_statfs = osc_statfs,
4641 .o_statfs_async = osc_statfs_async,
4642 .o_packmd = osc_packmd,
4643 .o_unpackmd = osc_unpackmd,
4644 .o_precreate = osc_precreate,
4645 .o_create = osc_create,
4646 .o_create_async = osc_create_async,
4647 .o_destroy = osc_destroy,
4648 .o_getattr = osc_getattr,
4649 .o_getattr_async = osc_getattr_async,
4650 .o_setattr = osc_setattr,
4651 .o_setattr_async = osc_setattr_async,
4653 .o_punch = osc_punch,
4655 .o_enqueue = osc_enqueue,
4656 .o_change_cbdata = osc_change_cbdata,
4657 .o_find_cbdata = osc_find_cbdata,
4658 .o_cancel = osc_cancel,
4659 .o_cancel_unused = osc_cancel_unused,
4660 .o_iocontrol = osc_iocontrol,
4661 .o_get_info = osc_get_info,
4662 .o_set_info_async = osc_set_info_async,
4663 .o_import_event = osc_import_event,
4664 .o_llog_init = osc_llog_init,
4665 .o_llog_finish = osc_llog_finish,
4666 .o_process_config = osc_process_config,
4667 .o_sync_fs = osc_sync_fs,
4670 extern struct lu_kmem_descr osc_caches[];
4671 extern cfs_spinlock_t osc_ast_guard;
4672 extern cfs_lock_class_key_t osc_ast_guard_class;
4674 int __init osc_init(void)
4676 struct lprocfs_static_vars lvars = { 0 };
4680 /* print an address of _any_ initialized kernel symbol from this
4681 * module, to allow debugging with gdb that doesn't support data
4682 * symbols from modules.*/
4683 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4685 rc = lu_kmem_init(osc_caches);
4687 lprocfs_osc_init_vars(&lvars);
4689 cfs_request_module("lquota");
4690 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4691 lquota_init(quota_interface);
4692 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4694 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4695 LUSTRE_OSC_NAME, &osc_device_type);
4697 if (quota_interface)
4698 PORTAL_SYMBOL_PUT(osc_quota_interface);
4699 lu_kmem_fini(osc_caches);
4703 cfs_spin_lock_init(&osc_ast_guard);
4704 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4706 osc_mds_ost_orig_logops = llog_lvfs_ops;
4707 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4708 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4709 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4710 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4716 static void /*__exit*/ osc_exit(void)
4718 lu_device_type_fini(&osc_device_type);
4720 lquota_exit(quota_interface);
4721 if (quota_interface)
4722 PORTAL_SYMBOL_PUT(osc_quota_interface);
4724 class_unregister_type(LUSTRE_OSC_NAME);
4725 lu_kmem_fini(osc_caches);
4728 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4729 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4730 MODULE_LICENSE("GPL");
4732 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);