1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 rc = sa->sa_upcall(sa->sa_cookie, rc);
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 obd_enqueue_update_f upcall, void *cookie,
370 struct ptlrpc_request_set *rqset)
372 struct ptlrpc_request *req;
373 struct osc_setattr_args *sa;
377 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 ptlrpc_request_free(req);
388 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 /* do mds to ost setattr asynchronously */
397 /* Do not wait for response. */
398 ptlrpcd_add_req(req, PSCOPE_OTHER);
400 req->rq_interpret_reply =
401 (ptlrpc_interpterer_t)osc_setattr_interpret;
403 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404 sa = ptlrpc_req_async_args(req);
405 sa->sa_oa = oinfo->oi_oa;
406 sa->sa_upcall = upcall;
407 sa->sa_cookie = cookie;
409 if (rqset == PTLRPCD_SET)
410 ptlrpcd_add_req(req, PSCOPE_OTHER);
412 ptlrpc_set_add_req(rqset, req);
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419 struct obd_trans_info *oti,
420 struct ptlrpc_request_set *rqset)
422 return osc_setattr_async_base(exp, oinfo, oti,
423 oinfo->oi_cb_up, oinfo, rqset);
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427 struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 struct ptlrpc_request *req;
430 struct ost_body *body;
431 struct lov_stripe_md *lsm;
440 rc = obd_alloc_memmd(exp, &lsm);
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447 GOTO(out, rc = -ENOMEM);
449 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451 ptlrpc_request_free(req);
455 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457 lustre_set_wire_obdo(&body->oa, oa);
459 ptlrpc_request_set_replen(req);
461 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462 oa->o_flags == OBD_FL_DELORPHAN) {
464 "delorphan from OST integration");
465 /* Don't resend the delorphan req */
466 req->rq_no_resend = req->rq_no_delay = 1;
469 rc = ptlrpc_queue_wait(req);
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 GOTO(out_req, rc = -EPROTO);
477 lustre_get_wire_obdo(oa, &body->oa);
479 /* This should really be sent by the OST */
480 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481 oa->o_valid |= OBD_MD_FLBLKSZ;
483 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484 * have valid lsm_oinfo data structs, so don't go touching that.
485 * This needs to be fixed in a big way.
487 lsm->lsm_object_id = oa->o_id;
488 lsm->lsm_object_seq = oa->o_seq;
492 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495 if (!oti->oti_logcookies)
496 oti_alloc_cookies(oti, 1);
497 *oti->oti_logcookies = oa->o_lcookie;
501 CDEBUG(D_HA, "transno: "LPD64"\n",
502 lustre_msg_get_transno(req->rq_repmsg));
504 ptlrpc_req_finished(req);
507 obd_free_memmd(exp, &lsm);
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512 obd_enqueue_update_f upcall, void *cookie,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_setattr_args *sa;
517 struct ost_body *body;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 ptlrpc_request_free(req);
531 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532 ptlrpc_at_set_req_timeout(req);
534 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537 osc_pack_capa(req, body, oinfo->oi_capa);
539 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PSCOPE_OTHER);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557 struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568 struct lov_stripe_md *md, obd_size start, obd_size end,
571 struct ptlrpc_request *req;
572 struct ost_body *body;
577 CDEBUG(D_INFO, "oa NULL\n");
581 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
585 osc_set_capa_size(req, &RMF_CAPA1, capa);
586 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588 ptlrpc_request_free(req);
592 /* overload the size and blocks fields in the oa with start/end */
593 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595 lustre_set_wire_obdo(&body->oa, oa);
596 body->oa.o_size = start;
597 body->oa.o_blocks = end;
598 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599 osc_pack_capa(req, body, capa);
601 ptlrpc_request_set_replen(req);
603 rc = ptlrpc_queue_wait(req);
607 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
609 GOTO(out, rc = -EPROTO);
611 lustre_get_wire_obdo(oa, &body->oa);
615 ptlrpc_req_finished(req);
619 /* Find and cancel locally locks matched by @mode in the resource found by
620 * @objid. Found locks are added into @cancel list. Returns the amount of
621 * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
624 ldlm_mode_t mode, int lock_flags)
626 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627 struct ldlm_res_id res_id;
628 struct ldlm_resource *res;
632 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
637 LDLM_RESOURCE_ADDREF(res);
638 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639 lock_flags, 0, NULL);
640 LDLM_RESOURCE_DELREF(res);
641 ldlm_resource_putref(res);
645 static int osc_destroy_interpret(const struct lu_env *env,
646 struct ptlrpc_request *req, void *data,
649 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
651 cfs_atomic_dec(&cli->cl_destroy_in_flight);
652 cfs_waitq_signal(&cli->cl_destroy_waitq);
656 static int osc_can_send_destroy(struct client_obd *cli)
658 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659 cli->cl_max_rpcs_in_flight) {
660 /* The destroy request can be sent */
663 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664 cli->cl_max_rpcs_in_flight) {
666 * The counter has been modified between the two atomic
669 cfs_waitq_signal(&cli->cl_destroy_waitq);
674 /* Destroy requests can be async always on the client, and we don't even really
675 * care about the return code since the client cannot do anything at all about
677 * When the MDS is unlinking a filename, it saves the file objects into a
678 * recovery llog, and these object records are cancelled when the OST reports
679 * they were destroyed and sync'd to disk (i.e. transaction committed).
680 * If the client dies, or the OST is down when the object should be destroyed,
681 * the records are not cancelled, and when the OST reconnects to the MDS next,
682 * it will retrieve the llog unlink logs and then sends the log cancellation
683 * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685 struct lov_stripe_md *ea, struct obd_trans_info *oti,
686 struct obd_export *md_export, void *capa)
688 struct client_obd *cli = &exp->exp_obd->u.cli;
689 struct ptlrpc_request *req;
690 struct ost_body *body;
691 CFS_LIST_HEAD(cancels);
696 CDEBUG(D_INFO, "oa NULL\n");
700 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701 LDLM_FL_DISCARD_DATA);
703 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
705 ldlm_lock_list_put(&cancels, l_bl_ast, count);
709 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
713 ptlrpc_request_free(req);
717 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718 ptlrpc_at_set_req_timeout(req);
720 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721 oa->o_lcookie = *oti->oti_logcookies;
722 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
724 lustre_set_wire_obdo(&body->oa, oa);
726 osc_pack_capa(req, body, (struct obd_capa *)capa);
727 ptlrpc_request_set_replen(req);
729 /* don't throttle destroy RPCs for the MDT */
730 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731 req->rq_interpret_reply = osc_destroy_interpret;
732 if (!osc_can_send_destroy(cli)) {
733 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
737 * Wait until the number of on-going destroy RPCs drops
738 * under max_rpc_in_flight
740 l_wait_event_exclusive(cli->cl_destroy_waitq,
741 osc_can_send_destroy(cli), &lwi);
745 /* Do not wait for response */
746 ptlrpcd_add_req(req, PSCOPE_OTHER);
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
753 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
755 LASSERT(!(oa->o_valid & bits));
758 client_obd_list_lock(&cli->cl_loi_list_lock);
759 oa->o_dirty = cli->cl_dirty;
760 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
764 } else if (cfs_atomic_read(&obd_dirty_pages) -
765 cfs_atomic_read(&obd_dirty_transit_pages) >
766 obd_max_dirty_pages + 1){
767 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768 * not covered by a lock thus they may safely race and trip
769 * this CERROR() unless we add in a small fudge factor (+1). */
770 CERROR("dirty %d - %d > system dirty_max %d\n",
771 cfs_atomic_read(&obd_dirty_pages),
772 cfs_atomic_read(&obd_dirty_transit_pages),
773 obd_max_dirty_pages);
775 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776 CERROR("dirty %lu - dirty_max %lu too big???\n",
777 cli->cl_dirty, cli->cl_dirty_max);
780 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781 (cli->cl_max_rpcs_in_flight + 1);
782 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
784 oa->o_grant = cli->cl_avail_grant;
785 oa->o_dropped = cli->cl_lost_grant;
786 cli->cl_lost_grant = 0;
787 client_obd_list_unlock(&cli->cl_loi_list_lock);
788 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
793 static void osc_update_next_shrink(struct client_obd *cli)
795 cli->cl_next_shrink_grant =
796 cfs_time_shift(cli->cl_grant_shrink_interval);
797 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798 cli->cl_next_shrink_grant);
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803 struct brw_page *pga)
805 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807 cfs_atomic_inc(&obd_dirty_pages);
808 cli->cl_dirty += CFS_PAGE_SIZE;
809 cli->cl_avail_grant -= CFS_PAGE_SIZE;
810 pga->flag |= OBD_BRW_FROM_GRANT;
811 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812 CFS_PAGE_SIZE, pga, pga->pg);
813 LASSERT(cli->cl_avail_grant >= 0);
814 osc_update_next_shrink(cli);
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818 * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820 struct brw_page *pga, int sent)
822 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
825 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
831 pga->flag &= ~OBD_BRW_FROM_GRANT;
832 cfs_atomic_dec(&obd_dirty_pages);
833 cli->cl_dirty -= CFS_PAGE_SIZE;
834 if (pga->flag & OBD_BRW_NOCACHE) {
835 pga->flag &= ~OBD_BRW_NOCACHE;
836 cfs_atomic_dec(&obd_dirty_transit_pages);
837 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
840 cli->cl_lost_grant += CFS_PAGE_SIZE;
841 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844 /* For short writes we shouldn't count parts of pages that
845 * span a whole block on the OST side, or our accounting goes
846 * wrong. Should match the code in filter_grant_check. */
847 int offset = pga->off & ~CFS_PAGE_MASK;
848 int count = pga->count + (offset & (blocksize - 1));
849 int end = (offset + pga->count) & (blocksize - 1);
851 count += blocksize - end;
853 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856 cli->cl_avail_grant, cli->cl_dirty);
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
864 return cli->cl_r_in_flight + cli->cl_w_in_flight;
867 int osc_wake_sync_fs(struct client_obd *cli)
870 if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
871 cli->cl_sf_wait.started) {
872 cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, 0);
873 cli->cl_sf_wait.started = 0;
878 /* caller must hold loi_list_lock */
879 void osc_wake_cache_waiters(struct client_obd *cli)
882 struct osc_cache_waiter *ocw;
885 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
886 /* if we can't dirty more, we must wait until some is written */
887 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
888 (cfs_atomic_read(&obd_dirty_pages) + 1 >
889 obd_max_dirty_pages)) {
890 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
891 "osc max %ld, sys max %d\n", cli->cl_dirty,
892 cli->cl_dirty_max, obd_max_dirty_pages);
896 /* if still dirty cache but no grant wait for pending RPCs that
897 * may yet return us some grant before doing sync writes */
898 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
899 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
900 cli->cl_w_in_flight);
904 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
905 cfs_list_del_init(&ocw->ocw_entry);
906 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
907 /* no more RPCs in flight to return grant, do sync IO */
908 ocw->ocw_rc = -EDQUOT;
909 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
911 osc_consume_write_grant(cli,
912 &ocw->ocw_oap->oap_brw_page);
915 cfs_waitq_signal(&ocw->ocw_waitq);
921 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
923 client_obd_list_lock(&cli->cl_loi_list_lock);
924 cli->cl_avail_grant += grant;
925 client_obd_list_unlock(&cli->cl_loi_list_lock);
928 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
930 if (body->oa.o_valid & OBD_MD_FLGRANT) {
931 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
932 __osc_update_grant(cli, body->oa.o_grant);
936 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
937 void *key, obd_count vallen, void *val,
938 struct ptlrpc_request_set *set);
940 static int osc_shrink_grant_interpret(const struct lu_env *env,
941 struct ptlrpc_request *req,
944 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
945 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
946 struct ost_body *body;
949 __osc_update_grant(cli, oa->o_grant);
953 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
955 osc_update_grant(cli, body);
961 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
963 client_obd_list_lock(&cli->cl_loi_list_lock);
964 oa->o_grant = cli->cl_avail_grant / 4;
965 cli->cl_avail_grant -= oa->o_grant;
966 client_obd_list_unlock(&cli->cl_loi_list_lock);
967 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
968 oa->o_valid |= OBD_MD_FLFLAGS;
971 oa->o_flags |= OBD_FL_SHRINK_GRANT;
972 osc_update_next_shrink(cli);
975 /* Shrink the current grant, either from some large amount to enough for a
976 * full set of in-flight RPCs, or if we have already shrunk to that limit
977 * then to enough for a single RPC. This avoids keeping more grant than
978 * needed, and avoids shrinking the grant piecemeal. */
979 static int osc_shrink_grant(struct client_obd *cli)
981 long target = (cli->cl_max_rpcs_in_flight + 1) *
982 cli->cl_max_pages_per_rpc;
984 client_obd_list_lock(&cli->cl_loi_list_lock);
985 if (cli->cl_avail_grant <= target)
986 target = cli->cl_max_pages_per_rpc;
987 client_obd_list_unlock(&cli->cl_loi_list_lock);
989 return osc_shrink_grant_to_target(cli, target);
992 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
995 struct ost_body *body;
998 client_obd_list_lock(&cli->cl_loi_list_lock);
999 /* Don't shrink if we are already above or below the desired limit
1000 * We don't want to shrink below a single RPC, as that will negatively
1001 * impact block allocation and long-term performance. */
1002 if (target < cli->cl_max_pages_per_rpc)
1003 target = cli->cl_max_pages_per_rpc;
1005 if (target >= cli->cl_avail_grant) {
1006 client_obd_list_unlock(&cli->cl_loi_list_lock);
1009 client_obd_list_unlock(&cli->cl_loi_list_lock);
1011 OBD_ALLOC_PTR(body);
1015 osc_announce_cached(cli, &body->oa, 0);
1017 client_obd_list_lock(&cli->cl_loi_list_lock);
1018 body->oa.o_grant = cli->cl_avail_grant - target;
1019 cli->cl_avail_grant = target;
1020 client_obd_list_unlock(&cli->cl_loi_list_lock);
1021 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1022 body->oa.o_valid |= OBD_MD_FLFLAGS;
1023 body->oa.o_flags = 0;
1025 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1026 osc_update_next_shrink(cli);
1028 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1029 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1030 sizeof(*body), body, NULL);
1032 __osc_update_grant(cli, body->oa.o_grant);
1037 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1038 static int osc_should_shrink_grant(struct client_obd *client)
1040 cfs_time_t time = cfs_time_current();
1041 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1043 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1044 OBD_CONNECT_GRANT_SHRINK) == 0)
1047 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1048 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1049 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1052 osc_update_next_shrink(client);
1057 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1059 struct client_obd *client;
1061 cfs_list_for_each_entry(client, &item->ti_obd_list,
1062 cl_grant_shrink_list) {
1063 if (osc_should_shrink_grant(client))
1064 osc_shrink_grant(client);
1069 static int osc_add_shrink_grant(struct client_obd *client)
1073 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1075 osc_grant_shrink_grant_cb, NULL,
1076 &client->cl_grant_shrink_list);
1078 CERROR("add grant client %s error %d\n",
1079 client->cl_import->imp_obd->obd_name, rc);
1082 CDEBUG(D_CACHE, "add grant client %s \n",
1083 client->cl_import->imp_obd->obd_name);
1084 osc_update_next_shrink(client);
1088 static int osc_del_shrink_grant(struct client_obd *client)
1090 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1094 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1097 * ocd_grant is the total grant amount we're expect to hold: if we've
1098 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1099 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1101 * race is tolerable here: if we're evicted, but imp_state already
1102 * left EVICTED state, then cl_dirty must be 0 already.
1104 client_obd_list_lock(&cli->cl_loi_list_lock);
1105 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1106 cli->cl_avail_grant = ocd->ocd_grant;
1108 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1110 if (cli->cl_avail_grant < 0) {
1111 CWARN("%s: available grant < 0, the OSS is probably not running"
1112 " with patch from bug20278 (%ld) \n",
1113 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1114 /* workaround for 1.6 servers which do not have
1115 * the patch from bug20278 */
1116 cli->cl_avail_grant = ocd->ocd_grant;
1119 client_obd_list_unlock(&cli->cl_loi_list_lock);
1121 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1122 cli->cl_import->imp_obd->obd_name,
1123 cli->cl_avail_grant, cli->cl_lost_grant);
1125 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1126 cfs_list_empty(&cli->cl_grant_shrink_list))
1127 osc_add_shrink_grant(cli);
1130 /* We assume that the reason this OSC got a short read is because it read
1131 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1132 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1133 * this stripe never got written at or beyond this stripe offset yet. */
1134 static void handle_short_read(int nob_read, obd_count page_count,
1135 struct brw_page **pga)
1140 /* skip bytes read OK */
1141 while (nob_read > 0) {
1142 LASSERT (page_count > 0);
1144 if (pga[i]->count > nob_read) {
1145 /* EOF inside this page */
1146 ptr = cfs_kmap(pga[i]->pg) +
1147 (pga[i]->off & ~CFS_PAGE_MASK);
1148 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1149 cfs_kunmap(pga[i]->pg);
1155 nob_read -= pga[i]->count;
1160 /* zero remaining pages */
1161 while (page_count-- > 0) {
1162 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1163 memset(ptr, 0, pga[i]->count);
1164 cfs_kunmap(pga[i]->pg);
1169 static int check_write_rcs(struct ptlrpc_request *req,
1170 int requested_nob, int niocount,
1171 obd_count page_count, struct brw_page **pga)
1176 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1177 sizeof(*remote_rcs) *
1179 if (remote_rcs == NULL) {
1180 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1184 /* return error if any niobuf was in error */
1185 for (i = 0; i < niocount; i++) {
1186 if (remote_rcs[i] < 0)
1187 return(remote_rcs[i]);
1189 if (remote_rcs[i] != 0) {
1190 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1191 i, remote_rcs[i], req);
1196 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1197 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1198 req->rq_bulk->bd_nob_transferred, requested_nob);
1205 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1207 if (p1->flag != p2->flag) {
1208 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1209 OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1211 /* warn if we try to combine flags that we don't know to be
1212 * safe to combine */
1213 if ((p1->flag & mask) != (p2->flag & mask))
1214 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1215 "same brw?\n", p1->flag, p2->flag);
1219 return (p1->off + p1->count == p2->off);
1222 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1223 struct brw_page **pga, int opc,
1224 cksum_type_t cksum_type)
1229 LASSERT (pg_count > 0);
1230 cksum = init_checksum(cksum_type);
1231 while (nob > 0 && pg_count > 0) {
1232 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1233 int off = pga[i]->off & ~CFS_PAGE_MASK;
1234 int count = pga[i]->count > nob ? nob : pga[i]->count;
1236 /* corrupt the data before we compute the checksum, to
1237 * simulate an OST->client data error */
1238 if (i == 0 && opc == OST_READ &&
1239 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1240 memcpy(ptr + off, "bad1", min(4, nob));
1241 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1242 cfs_kunmap(pga[i]->pg);
1243 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1246 nob -= pga[i]->count;
1250 /* For sending we only compute the wrong checksum instead
1251 * of corrupting the data so it is still correct on a redo */
1252 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1258 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1259 struct lov_stripe_md *lsm, obd_count page_count,
1260 struct brw_page **pga,
1261 struct ptlrpc_request **reqp,
1262 struct obd_capa *ocapa, int reserve,
1265 struct ptlrpc_request *req;
1266 struct ptlrpc_bulk_desc *desc;
1267 struct ost_body *body;
1268 struct obd_ioobj *ioobj;
1269 struct niobuf_remote *niobuf;
1270 int niocount, i, requested_nob, opc, rc;
1271 struct osc_brw_async_args *aa;
1272 struct req_capsule *pill;
1273 struct brw_page *pg_prev;
1276 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1277 RETURN(-ENOMEM); /* Recoverable */
1278 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1279 RETURN(-EINVAL); /* Fatal */
1281 if ((cmd & OBD_BRW_WRITE) != 0) {
1283 req = ptlrpc_request_alloc_pool(cli->cl_import,
1284 cli->cl_import->imp_rq_pool,
1285 &RQF_OST_BRW_WRITE);
1288 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1293 for (niocount = i = 1; i < page_count; i++) {
1294 if (!can_merge_pages(pga[i - 1], pga[i]))
1298 pill = &req->rq_pill;
1299 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1301 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1302 niocount * sizeof(*niobuf));
1303 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1305 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1307 ptlrpc_request_free(req);
1310 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1311 ptlrpc_at_set_req_timeout(req);
1313 if (opc == OST_WRITE)
1314 desc = ptlrpc_prep_bulk_imp(req, page_count,
1315 BULK_GET_SOURCE, OST_BULK_PORTAL);
1317 desc = ptlrpc_prep_bulk_imp(req, page_count,
1318 BULK_PUT_SINK, OST_BULK_PORTAL);
1321 GOTO(out, rc = -ENOMEM);
1322 /* NB request now owns desc and will free it when it gets freed */
1324 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1325 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1326 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1327 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1329 lustre_set_wire_obdo(&body->oa, oa);
1331 obdo_to_ioobj(oa, ioobj);
1332 ioobj->ioo_bufcnt = niocount;
1333 osc_pack_capa(req, body, ocapa);
1334 LASSERT (page_count > 0);
1336 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1337 struct brw_page *pg = pga[i];
1339 LASSERT(pg->count > 0);
1340 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1341 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1342 pg->off, pg->count);
1344 LASSERTF(i == 0 || pg->off > pg_prev->off,
1345 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1346 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1348 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1349 pg_prev->pg, page_private(pg_prev->pg),
1350 pg_prev->pg->index, pg_prev->off);
1352 LASSERTF(i == 0 || pg->off > pg_prev->off,
1353 "i %d p_c %u\n", i, page_count);
1355 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1356 (pg->flag & OBD_BRW_SRVLOCK));
1358 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1360 requested_nob += pg->count;
1362 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1364 niobuf->len += pg->count;
1366 niobuf->offset = pg->off;
1367 niobuf->len = pg->count;
1368 niobuf->flags = pg->flag;
1373 LASSERTF((void *)(niobuf - niocount) ==
1374 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1375 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1376 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1378 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1380 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1381 body->oa.o_valid |= OBD_MD_FLFLAGS;
1382 body->oa.o_flags = 0;
1384 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1387 if (osc_should_shrink_grant(cli))
1388 osc_shrink_grant_local(cli, &body->oa);
1390 /* size[REQ_REC_OFF] still sizeof (*body) */
1391 if (opc == OST_WRITE) {
1392 if (unlikely(cli->cl_checksum) &&
1393 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394 /* store cl_cksum_type in a local variable since
1395 * it can be changed via lprocfs */
1396 cksum_type_t cksum_type = cli->cl_cksum_type;
1398 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1399 oa->o_flags &= OBD_FL_LOCAL_MASK;
1400 body->oa.o_flags = 0;
1402 body->oa.o_flags |= cksum_type_pack(cksum_type);
1403 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1404 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1408 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1410 /* save this in 'oa', too, for later checking */
1411 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1412 oa->o_flags |= cksum_type_pack(cksum_type);
1414 /* clear out the checksum flag, in case this is a
1415 * resend but cl_checksum is no longer set. b=11238 */
1416 oa->o_valid &= ~OBD_MD_FLCKSUM;
1418 oa->o_cksum = body->oa.o_cksum;
1419 /* 1 RC per niobuf */
1420 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1421 sizeof(__u32) * niocount);
1423 if (unlikely(cli->cl_checksum) &&
1424 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1425 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1426 body->oa.o_flags = 0;
1427 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1428 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431 ptlrpc_request_set_replen(req);
1433 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1434 aa = ptlrpc_req_async_args(req);
1436 aa->aa_requested_nob = requested_nob;
1437 aa->aa_nio_count = niocount;
1438 aa->aa_page_count = page_count;
1442 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1443 if (ocapa && reserve)
1444 aa->aa_ocapa = capa_get(ocapa);
1450 ptlrpc_req_finished(req);
1454 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1455 __u32 client_cksum, __u32 server_cksum, int nob,
1456 obd_count page_count, struct brw_page **pga,
1457 cksum_type_t client_cksum_type)
1461 cksum_type_t cksum_type;
1463 if (server_cksum == client_cksum) {
1464 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1468 /* If this is mmaped file - it can be changed at any time */
1469 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1472 if (oa->o_valid & OBD_MD_FLFLAGS)
1473 cksum_type = cksum_type_unpack(oa->o_flags);
1475 cksum_type = OBD_CKSUM_CRC32;
1477 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1480 if (cksum_type != client_cksum_type)
1481 msg = "the server did not use the checksum type specified in "
1482 "the original request - likely a protocol problem";
1483 else if (new_cksum == server_cksum)
1484 msg = "changed on the client after we checksummed it - "
1485 "likely false positive due to mmap IO (bug 11742)";
1486 else if (new_cksum == client_cksum)
1487 msg = "changed in transit before arrival at OST";
1489 msg = "changed in transit AND doesn't match the original - "
1490 "likely false positive due to mmap IO (bug 11742)";
1492 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1493 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1494 msg, libcfs_nid2str(peer->nid),
1495 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1496 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1497 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1499 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1501 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1502 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1503 "client csum now %x\n", client_cksum, client_cksum_type,
1504 server_cksum, cksum_type, new_cksum);
1508 /* Note rc enters this function as number of bytes transferred */
1509 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1511 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1512 const lnet_process_id_t *peer =
1513 &req->rq_import->imp_connection->c_peer;
1514 struct client_obd *cli = aa->aa_cli;
1515 struct ost_body *body;
1516 __u32 client_cksum = 0;
1519 if (rc < 0 && rc != -EDQUOT) {
1520 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1524 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1525 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1527 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1531 #ifdef HAVE_QUOTA_SUPPORT
1532 /* set/clear over quota flag for a uid/gid */
1533 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1534 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1535 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1537 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1538 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1540 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1545 osc_update_grant(cli, body);
1550 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1551 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1553 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1555 CERROR("Unexpected +ve rc %d\n", rc);
1558 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1560 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1563 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1564 check_write_checksum(&body->oa, peer, client_cksum,
1565 body->oa.o_cksum, aa->aa_requested_nob,
1566 aa->aa_page_count, aa->aa_ppga,
1567 cksum_type_unpack(aa->aa_oa->o_flags)))
1570 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1571 aa->aa_page_count, aa->aa_ppga);
1575 /* The rest of this function executes only for OST_READs */
1577 /* if unwrap_bulk failed, return -EAGAIN to retry */
1578 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1580 GOTO(out, rc = -EAGAIN);
1582 if (rc > aa->aa_requested_nob) {
1583 CERROR("Unexpected rc %d (%d requested)\n", rc,
1584 aa->aa_requested_nob);
1588 if (rc != req->rq_bulk->bd_nob_transferred) {
1589 CERROR ("Unexpected rc %d (%d transferred)\n",
1590 rc, req->rq_bulk->bd_nob_transferred);
1594 if (rc < aa->aa_requested_nob)
1595 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1597 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1598 static int cksum_counter;
1599 __u32 server_cksum = body->oa.o_cksum;
1602 cksum_type_t cksum_type;
1604 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1605 cksum_type = cksum_type_unpack(body->oa.o_flags);
1607 cksum_type = OBD_CKSUM_CRC32;
1608 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1609 aa->aa_ppga, OST_READ,
1612 if (peer->nid == req->rq_bulk->bd_sender) {
1616 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1619 if (server_cksum == ~0 && rc > 0) {
1620 CERROR("Protocol error: server %s set the 'checksum' "
1621 "bit, but didn't send a checksum. Not fatal, "
1622 "but please notify on http://bugzilla.lustre.org/\n",
1623 libcfs_nid2str(peer->nid));
1624 } else if (server_cksum != client_cksum) {
1625 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1626 "%s%s%s inode "DFID" object "
1627 LPU64"/"LPU64" extent "
1628 "["LPU64"-"LPU64"]\n",
1629 req->rq_import->imp_obd->obd_name,
1630 libcfs_nid2str(peer->nid),
1632 body->oa.o_valid & OBD_MD_FLFID ?
1633 body->oa.o_parent_seq : (__u64)0,
1634 body->oa.o_valid & OBD_MD_FLFID ?
1635 body->oa.o_parent_oid : 0,
1636 body->oa.o_valid & OBD_MD_FLFID ?
1637 body->oa.o_parent_ver : 0,
1639 body->oa.o_valid & OBD_MD_FLGROUP ?
1640 body->oa.o_seq : (__u64)0,
1641 aa->aa_ppga[0]->off,
1642 aa->aa_ppga[aa->aa_page_count-1]->off +
1643 aa->aa_ppga[aa->aa_page_count-1]->count -
1645 CERROR("client %x, server %x, cksum_type %x\n",
1646 client_cksum, server_cksum, cksum_type);
1648 aa->aa_oa->o_cksum = client_cksum;
1652 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1655 } else if (unlikely(client_cksum)) {
1656 static int cksum_missed;
1659 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1660 CERROR("Checksum %u requested from %s but not sent\n",
1661 cksum_missed, libcfs_nid2str(peer->nid));
1667 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1672 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1673 struct lov_stripe_md *lsm,
1674 obd_count page_count, struct brw_page **pga,
1675 struct obd_capa *ocapa)
1677 struct ptlrpc_request *req;
1681 struct l_wait_info lwi;
1685 cfs_waitq_init(&waitq);
1688 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1689 page_count, pga, &req, ocapa, 0, resends);
1693 rc = ptlrpc_queue_wait(req);
1695 if (rc == -ETIMEDOUT && req->rq_resend) {
1696 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1697 ptlrpc_req_finished(req);
1701 rc = osc_brw_fini_request(req, rc);
1703 ptlrpc_req_finished(req);
1704 if (osc_recoverable_error(rc)) {
1706 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1707 CERROR("too many resend retries, returning error\n");
1711 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1712 l_wait_event(waitq, 0, &lwi);
1720 int osc_brw_redo_request(struct ptlrpc_request *request,
1721 struct osc_brw_async_args *aa)
1723 struct ptlrpc_request *new_req;
1724 struct ptlrpc_request_set *set = request->rq_set;
1725 struct osc_brw_async_args *new_aa;
1726 struct osc_async_page *oap;
1730 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1731 CERROR("too many resent retries, returning error\n");
1735 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1737 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1738 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1739 aa->aa_cli, aa->aa_oa,
1740 NULL /* lsm unused by osc currently */,
1741 aa->aa_page_count, aa->aa_ppga,
1742 &new_req, aa->aa_ocapa, 0, 1);
1746 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1748 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1749 if (oap->oap_request != NULL) {
1750 LASSERTF(request == oap->oap_request,
1751 "request %p != oap_request %p\n",
1752 request, oap->oap_request);
1753 if (oap->oap_interrupted) {
1754 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1755 ptlrpc_req_finished(new_req);
1760 /* New request takes over pga and oaps from old request.
1761 * Note that copying a list_head doesn't work, need to move it... */
1763 new_req->rq_interpret_reply = request->rq_interpret_reply;
1764 new_req->rq_async_args = request->rq_async_args;
1765 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1767 new_aa = ptlrpc_req_async_args(new_req);
1769 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1770 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1771 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1773 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1774 if (oap->oap_request) {
1775 ptlrpc_req_finished(oap->oap_request);
1776 oap->oap_request = ptlrpc_request_addref(new_req);
1780 new_aa->aa_ocapa = aa->aa_ocapa;
1781 aa->aa_ocapa = NULL;
1783 /* use ptlrpc_set_add_req is safe because interpret functions work
1784 * in check_set context. only one way exist with access to request
1785 * from different thread got -EINTR - this way protected with
1786 * cl_loi_list_lock */
1787 ptlrpc_set_add_req(set, new_req);
1789 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1791 DEBUG_REQ(D_INFO, new_req, "new request");
1796 * ugh, we want disk allocation on the target to happen in offset order. we'll
1797 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1798 * fine for our small page arrays and doesn't require allocation. its an
1799 * insertion sort that swaps elements that are strides apart, shrinking the
1800 * stride down until its '1' and the array is sorted.
1802 static void sort_brw_pages(struct brw_page **array, int num)
1805 struct brw_page *tmp;
1809 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1814 for (i = stride ; i < num ; i++) {
1817 while (j >= stride && array[j - stride]->off > tmp->off) {
1818 array[j] = array[j - stride];
1823 } while (stride > 1);
1826 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1832 LASSERT (pages > 0);
1833 offset = pg[i]->off & ~CFS_PAGE_MASK;
1837 if (pages == 0) /* that's all */
1840 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1841 return count; /* doesn't end on page boundary */
1844 offset = pg[i]->off & ~CFS_PAGE_MASK;
1845 if (offset != 0) /* doesn't start on page boundary */
1852 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1854 struct brw_page **ppga;
1857 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1861 for (i = 0; i < count; i++)
1866 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1868 LASSERT(ppga != NULL);
1869 OBD_FREE(ppga, sizeof(*ppga) * count);
1872 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1873 obd_count page_count, struct brw_page *pga,
1874 struct obd_trans_info *oti)
1876 struct obdo *saved_oa = NULL;
1877 struct brw_page **ppga, **orig;
1878 struct obd_import *imp = class_exp2cliimp(exp);
1879 struct client_obd *cli;
1880 int rc, page_count_orig;
1883 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1884 cli = &imp->imp_obd->u.cli;
1886 if (cmd & OBD_BRW_CHECK) {
1887 /* The caller just wants to know if there's a chance that this
1888 * I/O can succeed */
1890 if (imp->imp_invalid)
1895 /* test_brw with a failed create can trip this, maybe others. */
1896 LASSERT(cli->cl_max_pages_per_rpc);
1900 orig = ppga = osc_build_ppga(pga, page_count);
1903 page_count_orig = page_count;
1905 sort_brw_pages(ppga, page_count);
1906 while (page_count) {
1907 obd_count pages_per_brw;
1909 if (page_count > cli->cl_max_pages_per_rpc)
1910 pages_per_brw = cli->cl_max_pages_per_rpc;
1912 pages_per_brw = page_count;
1914 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1916 if (saved_oa != NULL) {
1917 /* restore previously saved oa */
1918 *oinfo->oi_oa = *saved_oa;
1919 } else if (page_count > pages_per_brw) {
1920 /* save a copy of oa (brw will clobber it) */
1921 OBDO_ALLOC(saved_oa);
1922 if (saved_oa == NULL)
1923 GOTO(out, rc = -ENOMEM);
1924 *saved_oa = *oinfo->oi_oa;
1927 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1928 pages_per_brw, ppga, oinfo->oi_capa);
1933 page_count -= pages_per_brw;
1934 ppga += pages_per_brw;
1938 osc_release_ppga(orig, page_count_orig);
1940 if (saved_oa != NULL)
1941 OBDO_FREE(saved_oa);
1946 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1947 * the dirty accounting. Writeback completes or truncate happens before
1948 * writing starts. Must be called with the loi lock held. */
1949 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1952 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1955 static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
1957 struct osc_async_page *oap;
1960 if (cfs_list_empty(&lop->lop_urgent))
1963 oap = cfs_list_entry(lop->lop_urgent.next,
1964 struct osc_async_page, oap_urgent_item);
1966 if (oap->oap_async_flags & ASYNC_SYNCFS) {
1967 CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
1974 /* This maintains the lists of pending pages to read/write for a given object
1975 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1976 * to quickly find objects that are ready to send an RPC. */
1977 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1983 if (lop->lop_num_pending == 0)
1986 /* if we have an invalid import we want to drain the queued pages
1987 * by forcing them through rpcs that immediately fail and complete
1988 * the pages. recovery relies on this to empty the queued pages
1989 * before canceling the locks and evicting down the llite pages */
1990 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1993 /* stream rpcs in queue order as long as as there is an urgent page
1994 * queued. this is our cheap solution for good batching in the case
1995 * where writepage marks some random page in the middle of the file
1996 * as urgent because of, say, memory pressure */
1997 if (!cfs_list_empty(&lop->lop_urgent)) {
1998 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2001 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2002 optimal = cli->cl_max_pages_per_rpc;
2003 if (cmd & OBD_BRW_WRITE) {
2004 /* trigger a write rpc stream as long as there are dirtiers
2005 * waiting for space. as they're waiting, they're not going to
2006 * create more pages to coalesce with what's waiting.. */
2007 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2008 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2011 /* +16 to avoid triggering rpcs that would want to include pages
2012 * that are being queued but which can't be made ready until
2013 * the queuer finishes with the page. this is a wart for
2014 * llite::commit_write() */
2017 if (lop->lop_num_pending >= optimal)
2023 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2025 struct osc_async_page *oap;
2028 if (cfs_list_empty(&lop->lop_urgent))
2031 oap = cfs_list_entry(lop->lop_urgent.next,
2032 struct osc_async_page, oap_urgent_item);
2034 if (oap->oap_async_flags & ASYNC_HP) {
2035 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2042 static void on_list(cfs_list_t *item, cfs_list_t *list,
2045 if (cfs_list_empty(item) && should_be_on)
2046 cfs_list_add_tail(item, list);
2047 else if (!cfs_list_empty(item) && !should_be_on)
2048 cfs_list_del_init(item);
2051 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2052 * can find pages to build into rpcs quickly */
2053 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2055 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2056 lop_makes_hprpc(&loi->loi_read_lop)) {
2058 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2059 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2061 if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
2062 on_list(&loi->loi_sync_fs_item,
2063 &cli->cl_loi_sync_fs_list,
2064 loi->loi_write_lop.lop_num_pending);
2066 on_list(&loi->loi_hp_ready_item,
2067 &cli->cl_loi_hp_ready_list, 0);
2068 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2069 lop_makes_rpc(cli, &loi->loi_write_lop,
2071 lop_makes_rpc(cli, &loi->loi_read_lop,
2076 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2077 loi->loi_write_lop.lop_num_pending);
2079 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2080 loi->loi_read_lop.lop_num_pending);
2083 static void lop_update_pending(struct client_obd *cli,
2084 struct loi_oap_pages *lop, int cmd, int delta)
2086 lop->lop_num_pending += delta;
2087 if (cmd & OBD_BRW_WRITE)
2088 cli->cl_pending_w_pages += delta;
2090 cli->cl_pending_r_pages += delta;
2094 * this is called when a sync waiter receives an interruption. Its job is to
2095 * get the caller woken as soon as possible. If its page hasn't been put in an
2096 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2097 * desiring interruption which will forcefully complete the rpc once the rpc
2100 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2102 struct loi_oap_pages *lop;
2103 struct lov_oinfo *loi;
2107 LASSERT(!oap->oap_interrupted);
2108 oap->oap_interrupted = 1;
2110 /* ok, it's been put in an rpc. only one oap gets a request reference */
2111 if (oap->oap_request != NULL) {
2112 ptlrpc_mark_interrupted(oap->oap_request);
2113 ptlrpcd_wake(oap->oap_request);
2114 ptlrpc_req_finished(oap->oap_request);
2115 oap->oap_request = NULL;
2119 * page completion may be called only if ->cpo_prep() method was
2120 * executed by osc_io_submit(), that also adds page the to pending list
2122 if (!cfs_list_empty(&oap->oap_pending_item)) {
2123 cfs_list_del_init(&oap->oap_pending_item);
2124 cfs_list_del_init(&oap->oap_urgent_item);
2127 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2128 &loi->loi_write_lop : &loi->loi_read_lop;
2129 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2130 loi_list_maint(oap->oap_cli, oap->oap_loi);
2131 rc = oap->oap_caller_ops->ap_completion(env,
2132 oap->oap_caller_data,
2133 oap->oap_cmd, NULL, -EINTR);
2139 /* this is trying to propogate async writeback errors back up to the
2140 * application. As an async write fails we record the error code for later if
2141 * the app does an fsync. As long as errors persist we force future rpcs to be
2142 * sync so that the app can get a sync error and break the cycle of queueing
2143 * pages for which writeback will fail. */
2144 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2151 ar->ar_force_sync = 1;
2152 ar->ar_min_xid = ptlrpc_sample_next_xid();
2157 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2158 ar->ar_force_sync = 0;
2161 static int osc_add_to_lop_urgent(struct loi_oap_pages *lop,
2162 struct osc_async_page *oap,
2163 obd_flag async_flags)
2166 /* If true, then already present in lop urgent */
2167 if (!cfs_list_empty(&oap->oap_urgent_item)) {
2168 CWARN("Request to add duplicate oap_urgent for flag = %d\n",
2169 oap->oap_async_flags);
2173 /* item from sync_fs, to avoid duplicates check the existing flags */
2174 if (async_flags & ASYNC_SYNCFS) {
2175 cfs_list_add_tail(&oap->oap_urgent_item,
2180 if (oap->oap_async_flags & ASYNC_HP)
2181 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2182 else if (oap->oap_async_flags & ASYNC_URGENT ||
2183 async_flags & ASYNC_URGENT)
2184 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2189 void osc_oap_to_pending(struct osc_async_page *oap)
2191 struct loi_oap_pages *lop;
2193 if (oap->oap_cmd & OBD_BRW_WRITE)
2194 lop = &oap->oap_loi->loi_write_lop;
2196 lop = &oap->oap_loi->loi_read_lop;
2198 osc_add_to_lop_urgent(lop, oap, 0);
2199 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2200 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2203 /* this must be called holding the loi list lock to give coverage to exit_cache,
2204 * async_flag maintenance, and oap_request */
2205 static void osc_ap_completion(const struct lu_env *env,
2206 struct client_obd *cli, struct obdo *oa,
2207 struct osc_async_page *oap, int sent, int rc)
2212 if (oap->oap_request != NULL) {
2213 xid = ptlrpc_req_xid(oap->oap_request);
2214 ptlrpc_req_finished(oap->oap_request);
2215 oap->oap_request = NULL;
2218 cfs_spin_lock(&oap->oap_lock);
2219 oap->oap_async_flags = 0;
2220 cfs_spin_unlock(&oap->oap_lock);
2221 oap->oap_interrupted = 0;
2223 if (oap->oap_cmd & OBD_BRW_WRITE) {
2224 osc_process_ar(&cli->cl_ar, xid, rc);
2225 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2228 if (rc == 0 && oa != NULL) {
2229 if (oa->o_valid & OBD_MD_FLBLOCKS)
2230 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2231 if (oa->o_valid & OBD_MD_FLMTIME)
2232 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2233 if (oa->o_valid & OBD_MD_FLATIME)
2234 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2235 if (oa->o_valid & OBD_MD_FLCTIME)
2236 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2239 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2240 oap->oap_cmd, oa, rc);
2242 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2243 * I/O on the page could start, but OSC calls it under lock
2244 * and thus we can add oap back to pending safely */
2246 /* upper layer wants to leave the page on pending queue */
2247 osc_oap_to_pending(oap);
2249 osc_exit_cache(cli, oap, sent);
2253 static int brw_interpret(const struct lu_env *env,
2254 struct ptlrpc_request *req, void *data, int rc)
2256 struct osc_brw_async_args *aa = data;
2257 struct client_obd *cli;
2261 rc = osc_brw_fini_request(req, rc);
2262 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2263 if (osc_recoverable_error(rc)) {
2264 /* Only retry once for mmaped files since the mmaped page
2265 * might be modified at anytime. We have to retry at least
2266 * once in case there WAS really a corruption of the page
2267 * on the network, that was not caused by mmap() modifying
2268 * the page. Bug11742 */
2269 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2270 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2271 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2274 rc = osc_brw_redo_request(req, aa);
2281 capa_put(aa->aa_ocapa);
2282 aa->aa_ocapa = NULL;
2287 client_obd_list_lock(&cli->cl_loi_list_lock);
2289 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2290 * is called so we know whether to go to sync BRWs or wait for more
2291 * RPCs to complete */
2292 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2293 cli->cl_w_in_flight--;
2295 cli->cl_r_in_flight--;
2297 async = cfs_list_empty(&aa->aa_oaps);
2298 if (!async) { /* from osc_send_oap_rpc() */
2299 struct osc_async_page *oap, *tmp;
2300 /* the caller may re-use the oap after the completion call so
2301 * we need to clean it up a little */
2302 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2304 cfs_list_del_init(&oap->oap_rpc_item);
2305 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2307 OBDO_FREE(aa->aa_oa);
2308 } else { /* from async_internal() */
2310 for (i = 0; i < aa->aa_page_count; i++)
2311 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2313 osc_wake_cache_waiters(cli);
2314 osc_wake_sync_fs(cli);
2315 osc_check_rpcs(env, cli);
2316 client_obd_list_unlock(&cli->cl_loi_list_lock);
2318 cl_req_completion(env, aa->aa_clerq, rc);
2319 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2324 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2325 struct client_obd *cli,
2326 cfs_list_t *rpc_list,
2327 int page_count, int cmd)
2329 struct ptlrpc_request *req;
2330 struct brw_page **pga = NULL;
2331 struct osc_brw_async_args *aa;
2332 struct obdo *oa = NULL;
2333 const struct obd_async_page_ops *ops = NULL;
2334 void *caller_data = NULL;
2335 struct osc_async_page *oap;
2336 struct osc_async_page *tmp;
2337 struct ost_body *body;
2338 struct cl_req *clerq = NULL;
2339 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2340 struct ldlm_lock *lock = NULL;
2341 struct cl_req_attr crattr;
2342 int i, rc, mpflag = 0;
2345 LASSERT(!cfs_list_empty(rpc_list));
2347 if (cmd & OBD_BRW_MEMALLOC)
2348 mpflag = cfs_memory_pressure_get_and_set();
2350 memset(&crattr, 0, sizeof crattr);
2351 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2353 GOTO(out, req = ERR_PTR(-ENOMEM));
2357 GOTO(out, req = ERR_PTR(-ENOMEM));
2360 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2361 struct cl_page *page = osc_oap2cl_page(oap);
2363 ops = oap->oap_caller_ops;
2364 caller_data = oap->oap_caller_data;
2366 clerq = cl_req_alloc(env, page, crt,
2367 1 /* only 1-object rpcs for
2370 GOTO(out, req = (void *)clerq);
2371 lock = oap->oap_ldlm_lock;
2373 pga[i] = &oap->oap_brw_page;
2374 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2375 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2376 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2378 cl_req_page_add(env, clerq, page);
2381 /* always get the data for the obdo for the rpc */
2382 LASSERT(ops != NULL);
2384 crattr.cra_capa = NULL;
2385 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2387 oa->o_handle = lock->l_remote_handle;
2388 oa->o_valid |= OBD_MD_FLHANDLE;
2391 rc = cl_req_prep(env, clerq);
2393 CERROR("cl_req_prep failed: %d\n", rc);
2394 GOTO(out, req = ERR_PTR(rc));
2397 sort_brw_pages(pga, page_count);
2398 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2399 pga, &req, crattr.cra_capa, 1, 0);
2401 CERROR("prep_req failed: %d\n", rc);
2402 GOTO(out, req = ERR_PTR(rc));
2405 if (cmd & OBD_BRW_MEMALLOC)
2406 req->rq_memalloc = 1;
2408 /* Need to update the timestamps after the request is built in case
2409 * we race with setattr (locally or in queue at OST). If OST gets
2410 * later setattr before earlier BRW (as determined by the request xid),
2411 * the OST will not use BRW timestamps. Sadly, there is no obvious
2412 * way to do this in a single call. bug 10150 */
2413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2414 cl_req_attr_set(env, clerq, &crattr,
2415 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2417 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2418 aa = ptlrpc_req_async_args(req);
2419 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2420 cfs_list_splice(rpc_list, &aa->aa_oaps);
2421 CFS_INIT_LIST_HEAD(rpc_list);
2422 aa->aa_clerq = clerq;
2424 if (cmd & OBD_BRW_MEMALLOC)
2425 cfs_memory_pressure_restore(mpflag);
2427 capa_put(crattr.cra_capa);
2432 OBD_FREE(pga, sizeof(*pga) * page_count);
2433 /* this should happen rarely and is pretty bad, it makes the
2434 * pending list not follow the dirty order */
2435 client_obd_list_lock(&cli->cl_loi_list_lock);
2436 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2437 cfs_list_del_init(&oap->oap_rpc_item);
2439 /* queued sync pages can be torn down while the pages
2440 * were between the pending list and the rpc */
2441 if (oap->oap_interrupted) {
2442 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2443 osc_ap_completion(env, cli, NULL, oap, 0,
2447 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2449 if (clerq && !IS_ERR(clerq))
2450 cl_req_completion(env, clerq, PTR_ERR(req));
2456 * prepare pages for ASYNC io and put pages in send queue.
2458 * \param cmd OBD_BRW_* macroses
2459 * \param lop pending pages
2461 * \return zero if no page added to send queue.
2462 * \return 1 if pages successfully added to send queue.
2463 * \return negative on errors.
2466 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2467 struct lov_oinfo *loi,
2468 int cmd, struct loi_oap_pages *lop)
2470 struct ptlrpc_request *req;
2471 obd_count page_count = 0;
2472 struct osc_async_page *oap = NULL, *tmp;
2473 struct osc_brw_async_args *aa;
2474 const struct obd_async_page_ops *ops;
2475 CFS_LIST_HEAD(rpc_list);
2476 CFS_LIST_HEAD(tmp_list);
2477 unsigned int ending_offset;
2478 unsigned starting_offset = 0;
2479 int srvlock = 0, mem_tight = 0;
2480 struct cl_object *clob = NULL;
2483 /* ASYNC_HP pages first. At present, when the lock the pages is
2484 * to be canceled, the pages covered by the lock will be sent out
2485 * with ASYNC_HP. We have to send out them as soon as possible. */
2486 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2487 if (oap->oap_async_flags & ASYNC_HP)
2488 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2490 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2491 if (++page_count >= cli->cl_max_pages_per_rpc)
2495 cfs_list_splice(&tmp_list, &lop->lop_pending);
2498 /* first we find the pages we're allowed to work with */
2499 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2501 ops = oap->oap_caller_ops;
2503 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2504 "magic 0x%x\n", oap, oap->oap_magic);
2507 /* pin object in memory, so that completion call-backs
2508 * can be safely called under client_obd_list lock. */
2509 clob = osc_oap2cl_page(oap)->cp_obj;
2510 cl_object_get(clob);
2513 if (page_count != 0 &&
2514 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2515 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2516 " oap %p, page %p, srvlock %u\n",
2517 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2521 /* If there is a gap at the start of this page, it can't merge
2522 * with any previous page, so we'll hand the network a
2523 * "fragmented" page array that it can't transfer in 1 RDMA */
2524 if (page_count != 0 && oap->oap_page_off != 0)
2527 /* in llite being 'ready' equates to the page being locked
2528 * until completion unlocks it. commit_write submits a page
2529 * as not ready because its unlock will happen unconditionally
2530 * as the call returns. if we race with commit_write giving
2531 * us that page we don't want to create a hole in the page
2532 * stream, so we stop and leave the rpc to be fired by
2533 * another dirtier or kupdated interval (the not ready page
2534 * will still be on the dirty list). we could call in
2535 * at the end of ll_file_write to process the queue again. */
2536 if (!(oap->oap_async_flags & ASYNC_READY)) {
2537 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2540 CDEBUG(D_INODE, "oap %p page %p returned %d "
2541 "instead of ready\n", oap,
2545 /* llite is telling us that the page is still
2546 * in commit_write and that we should try
2547 * and put it in an rpc again later. we
2548 * break out of the loop so we don't create
2549 * a hole in the sequence of pages in the rpc
2554 /* the io isn't needed.. tell the checks
2555 * below to complete the rpc with EINTR */
2556 cfs_spin_lock(&oap->oap_lock);
2557 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2558 cfs_spin_unlock(&oap->oap_lock);
2559 oap->oap_count = -EINTR;
2562 cfs_spin_lock(&oap->oap_lock);
2563 oap->oap_async_flags |= ASYNC_READY;
2564 cfs_spin_unlock(&oap->oap_lock);
2567 LASSERTF(0, "oap %p page %p returned %d "
2568 "from make_ready\n", oap,
2576 * Page submitted for IO has to be locked. Either by
2577 * ->ap_make_ready() or by higher layers.
2579 #if defined(__KERNEL__) && defined(__linux__)
2581 struct cl_page *page;
2583 page = osc_oap2cl_page(oap);
2585 if (page->cp_type == CPT_CACHEABLE &&
2586 !(PageLocked(oap->oap_page) &&
2587 (CheckWriteback(oap->oap_page, cmd)))) {
2588 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2590 (long)oap->oap_page->flags,
2591 oap->oap_async_flags);
2597 /* take the page out of our book-keeping */
2598 cfs_list_del_init(&oap->oap_pending_item);
2599 lop_update_pending(cli, lop, cmd, -1);
2600 cfs_list_del_init(&oap->oap_urgent_item);
2602 if (page_count == 0)
2603 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2604 (PTLRPC_MAX_BRW_SIZE - 1);
2606 /* ask the caller for the size of the io as the rpc leaves. */
2607 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2609 ops->ap_refresh_count(env, oap->oap_caller_data,
2611 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2613 if (oap->oap_count <= 0) {
2614 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2616 osc_ap_completion(env, cli, NULL,
2617 oap, 0, oap->oap_count);
2621 /* now put the page back in our accounting */
2622 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2623 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2625 if (page_count == 0)
2626 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2627 if (++page_count >= cli->cl_max_pages_per_rpc)
2630 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2631 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2632 * have the same alignment as the initial writes that allocated
2633 * extents on the server. */
2634 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2635 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2636 if (ending_offset == 0)
2639 /* If there is a gap at the end of this page, it can't merge
2640 * with any subsequent pages, so we'll hand the network a
2641 * "fragmented" page array that it can't transfer in 1 RDMA */
2642 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2646 osc_wake_cache_waiters(cli);
2647 osc_wake_sync_fs(cli);
2648 loi_list_maint(cli, loi);
2650 client_obd_list_unlock(&cli->cl_loi_list_lock);
2653 cl_object_put(env, clob);
2655 if (page_count == 0) {
2656 client_obd_list_lock(&cli->cl_loi_list_lock);
2660 req = osc_build_req(env, cli, &rpc_list, page_count,
2661 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2663 LASSERT(cfs_list_empty(&rpc_list));
2664 loi_list_maint(cli, loi);
2665 RETURN(PTR_ERR(req));
2668 aa = ptlrpc_req_async_args(req);
2670 if (cmd == OBD_BRW_READ) {
2671 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2672 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2673 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2674 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2676 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2677 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2678 cli->cl_w_in_flight);
2679 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2680 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2682 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2684 client_obd_list_lock(&cli->cl_loi_list_lock);
2686 if (cmd == OBD_BRW_READ)
2687 cli->cl_r_in_flight++;
2689 cli->cl_w_in_flight++;
2691 /* queued sync pages can be torn down while the pages
2692 * were between the pending list and the rpc */
2694 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2695 /* only one oap gets a request reference */
2698 if (oap->oap_interrupted && !req->rq_intr) {
2699 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2701 ptlrpc_mark_interrupted(req);
2705 tmp->oap_request = ptlrpc_request_addref(req);
2707 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2708 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2710 req->rq_interpret_reply = brw_interpret;
2711 ptlrpcd_add_req(req, PSCOPE_BRW);
2715 #define LOI_DEBUG(LOI, STR, args...) \
2716 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2717 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2718 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2719 (LOI)->loi_write_lop.lop_num_pending, \
2720 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2721 (LOI)->loi_read_lop.lop_num_pending, \
2722 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2725 /* This is called by osc_check_rpcs() to find which objects have pages that
2726 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2727 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2731 /* First return objects that have blocked locks so that they
2732 * will be flushed quickly and other clients can get the lock,
2733 * then objects which have pages ready to be stuffed into RPCs */
2734 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2735 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2736 struct lov_oinfo, loi_hp_ready_item));
2737 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2738 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2739 struct lov_oinfo, loi_ready_item));
2740 if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
2741 RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
2742 struct lov_oinfo, loi_sync_fs_item));
2744 /* then if we have cache waiters, return all objects with queued
2745 * writes. This is especially important when many small files
2746 * have filled up the cache and not been fired into rpcs because
2747 * they don't pass the nr_pending/object threshhold */
2748 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2749 !cfs_list_empty(&cli->cl_loi_write_list))
2750 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2751 struct lov_oinfo, loi_write_item));
2753 /* then return all queued objects when we have an invalid import
2754 * so that they get flushed */
2755 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2756 if (!cfs_list_empty(&cli->cl_loi_write_list))
2757 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2760 if (!cfs_list_empty(&cli->cl_loi_read_list))
2761 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2762 struct lov_oinfo, loi_read_item));
2767 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2769 struct osc_async_page *oap;
2772 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2773 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2774 struct osc_async_page, oap_urgent_item);
2775 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2778 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2779 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2780 struct osc_async_page, oap_urgent_item);
2781 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2784 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2787 /* called with the loi list lock held */
2788 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2790 struct lov_oinfo *loi;
2791 int rc = 0, race_counter = 0;
2794 while ((loi = osc_next_loi(cli)) != NULL) {
2795 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2797 if (osc_max_rpc_in_flight(cli, loi))
2800 /* attempt some read/write balancing by alternating between
2801 * reads and writes in an object. The makes_rpc checks here
2802 * would be redundant if we were getting read/write work items
2803 * instead of objects. we don't want send_oap_rpc to drain a
2804 * partial read pending queue when we're given this object to
2805 * do io on writes while there are cache waiters */
2806 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2807 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2808 &loi->loi_write_lop);
2810 CERROR("Write request failed with %d\n", rc);
2812 /* osc_send_oap_rpc failed, mostly because of
2815 * It can't break here, because if:
2816 * - a page was submitted by osc_io_submit, so
2818 * - no request in flight
2819 * - no subsequent request
2820 * The system will be in live-lock state,
2821 * because there is no chance to call
2822 * osc_io_unplug() and osc_check_rpcs() any
2823 * more. pdflush can't help in this case,
2824 * because it might be blocked at grabbing
2825 * the page lock as we mentioned.
2827 * Anyway, continue to drain pages. */
2836 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2837 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2838 &loi->loi_read_lop);
2840 CERROR("Read request failed with %d\n", rc);
2848 /* attempt some inter-object balancing by issuing rpcs
2849 * for each object in turn */
2850 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2851 cfs_list_del_init(&loi->loi_hp_ready_item);
2852 if (!cfs_list_empty(&loi->loi_ready_item))
2853 cfs_list_del_init(&loi->loi_ready_item);
2854 if (!cfs_list_empty(&loi->loi_write_item))
2855 cfs_list_del_init(&loi->loi_write_item);
2856 if (!cfs_list_empty(&loi->loi_read_item))
2857 cfs_list_del_init(&loi->loi_read_item);
2858 if (!cfs_list_empty(&loi->loi_sync_fs_item))
2859 cfs_list_del_init(&loi->loi_sync_fs_item);
2861 loi_list_maint(cli, loi);
2863 /* send_oap_rpc fails with 0 when make_ready tells it to
2864 * back off. llite's make_ready does this when it tries
2865 * to lock a page queued for write that is already locked.
2866 * we want to try sending rpcs from many objects, but we
2867 * don't want to spin failing with 0. */
2868 if (race_counter == 10)
2874 /* we're trying to queue a page in the osc so we're subject to the
2875 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2876 * If the osc's queued pages are already at that limit, then we want to sleep
2877 * until there is space in the osc's queue for us. We also may be waiting for
2878 * write credits from the OST if there are RPCs in flight that may return some
2879 * before we fall back to sync writes.
2881 * We need this know our allocation was granted in the presence of signals */
2882 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2886 client_obd_list_lock(&cli->cl_loi_list_lock);
2887 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2888 client_obd_list_unlock(&cli->cl_loi_list_lock);
2893 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2896 int osc_enter_cache_try(const struct lu_env *env,
2897 struct client_obd *cli, struct lov_oinfo *loi,
2898 struct osc_async_page *oap, int transient)
2902 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2904 osc_consume_write_grant(cli, &oap->oap_brw_page);
2906 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2907 cfs_atomic_inc(&obd_dirty_transit_pages);
2908 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2914 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2915 * grant or cache space. */
2916 static int osc_enter_cache(const struct lu_env *env,
2917 struct client_obd *cli, struct lov_oinfo *loi,
2918 struct osc_async_page *oap)
2920 struct osc_cache_waiter ocw;
2921 struct l_wait_info lwi = { 0 };
2925 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2926 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2927 cli->cl_dirty_max, obd_max_dirty_pages,
2928 cli->cl_lost_grant, cli->cl_avail_grant);
2930 /* force the caller to try sync io. this can jump the list
2931 * of queued writes and create a discontiguous rpc stream */
2932 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2933 loi->loi_ar.ar_force_sync)
2936 /* Hopefully normal case - cache space and write credits available */
2937 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2938 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2939 osc_enter_cache_try(env, cli, loi, oap, 0))
2942 /* It is safe to block as a cache waiter as long as there is grant
2943 * space available or the hope of additional grant being returned
2944 * when an in flight write completes. Using the write back cache
2945 * if possible is preferable to sending the data synchronously
2946 * because write pages can then be merged in to large requests.
2947 * The addition of this cache waiter will causing pending write
2948 * pages to be sent immediately. */
2949 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2950 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2951 cfs_waitq_init(&ocw.ocw_waitq);
2955 loi_list_maint(cli, loi);
2956 osc_check_rpcs(env, cli);
2957 client_obd_list_unlock(&cli->cl_loi_list_lock);
2959 CDEBUG(D_CACHE, "sleeping for cache space\n");
2960 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2962 client_obd_list_lock(&cli->cl_loi_list_lock);
2963 if (!cfs_list_empty(&ocw.ocw_entry)) {
2964 cfs_list_del(&ocw.ocw_entry);
2974 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2975 struct lov_oinfo *loi, cfs_page_t *page,
2976 obd_off offset, const struct obd_async_page_ops *ops,
2977 void *data, void **res, int nocache,
2978 struct lustre_handle *lockh)
2980 struct osc_async_page *oap;
2985 return cfs_size_round(sizeof(*oap));
2988 oap->oap_magic = OAP_MAGIC;
2989 oap->oap_cli = &exp->exp_obd->u.cli;
2992 oap->oap_caller_ops = ops;
2993 oap->oap_caller_data = data;
2995 oap->oap_page = page;
2996 oap->oap_obj_off = offset;
2997 if (!client_is_remote(exp) &&
2998 cfs_capable(CFS_CAP_SYS_RESOURCE))
2999 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
3001 LASSERT(!(offset & ~CFS_PAGE_MASK));
3003 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
3004 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
3005 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
3006 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
3008 cfs_spin_lock_init(&oap->oap_lock);
3009 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
3013 struct osc_async_page *oap_from_cookie(void *cookie)
3015 struct osc_async_page *oap = cookie;
3016 if (oap->oap_magic != OAP_MAGIC)
3017 return ERR_PTR(-EINVAL);
3021 int osc_queue_async_io(const struct lu_env *env,
3022 struct obd_export *exp, struct lov_stripe_md *lsm,
3023 struct lov_oinfo *loi, void *cookie,
3024 int cmd, obd_off off, int count,
3025 obd_flag brw_flags, enum async_flags async_flags)
3027 struct client_obd *cli = &exp->exp_obd->u.cli;
3028 struct osc_async_page *oap;
3032 oap = oap_from_cookie(cookie);
3034 RETURN(PTR_ERR(oap));
3036 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3039 if (!cfs_list_empty(&oap->oap_pending_item) ||
3040 !cfs_list_empty(&oap->oap_urgent_item) ||
3041 !cfs_list_empty(&oap->oap_rpc_item))
3044 /* check if the file's owner/group is over quota */
3045 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3046 struct cl_object *obj;
3047 struct cl_attr attr; /* XXX put attr into thread info */
3048 unsigned int qid[MAXQUOTAS];
3050 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3052 cl_object_attr_lock(obj);
3053 rc = cl_object_attr_get(env, obj, &attr);
3054 cl_object_attr_unlock(obj);
3056 qid[USRQUOTA] = attr.cat_uid;
3057 qid[GRPQUOTA] = attr.cat_gid;
3059 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3066 loi = lsm->lsm_oinfo[0];
3068 client_obd_list_lock(&cli->cl_loi_list_lock);
3070 LASSERT(off + count <= CFS_PAGE_SIZE);
3072 oap->oap_page_off = off;
3073 oap->oap_count = count;
3074 oap->oap_brw_flags = brw_flags;
3075 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3076 if (cfs_memory_pressure_get())
3077 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3078 cfs_spin_lock(&oap->oap_lock);
3079 oap->oap_async_flags = async_flags;
3080 cfs_spin_unlock(&oap->oap_lock);
3082 if (cmd & OBD_BRW_WRITE) {
3083 rc = osc_enter_cache(env, cli, loi, oap);
3085 client_obd_list_unlock(&cli->cl_loi_list_lock);
3090 osc_oap_to_pending(oap);
3091 loi_list_maint(cli, loi);
3093 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3096 osc_check_rpcs(env, cli);
3097 client_obd_list_unlock(&cli->cl_loi_list_lock);
3102 /* aka (~was & now & flag), but this is more clear :) */
3103 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3105 int osc_set_async_flags_base(struct client_obd *cli,
3106 struct lov_oinfo *loi, struct osc_async_page *oap,
3107 obd_flag async_flags)
3109 struct loi_oap_pages *lop;
3113 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3115 if (oap->oap_cmd & OBD_BRW_WRITE) {
3116 lop = &loi->loi_write_lop;
3118 lop = &loi->loi_read_lop;
3121 if ((oap->oap_async_flags & async_flags) == async_flags)
3124 /* XXX: This introduces a tiny insignificant race for the case if this
3125 * loi already had other urgent items.
3127 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
3128 cfs_list_empty(&oap->oap_rpc_item) &&
3129 cfs_list_empty(&oap->oap_urgent_item)) {
3130 osc_add_to_lop_urgent(lop, oap, ASYNC_SYNCFS);
3131 flags |= ASYNC_SYNCFS;
3132 cfs_spin_lock(&oap->oap_lock);
3133 oap->oap_async_flags |= flags;
3134 cfs_spin_unlock(&oap->oap_lock);
3135 loi_list_maint(cli, loi);
3139 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3140 flags |= ASYNC_READY;
3142 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3143 cfs_list_empty(&oap->oap_rpc_item)) {
3144 osc_add_to_lop_urgent(lop, oap, ASYNC_URGENT);
3145 flags |= ASYNC_URGENT;
3146 loi_list_maint(cli, loi);
3148 cfs_spin_lock(&oap->oap_lock);
3149 oap->oap_async_flags |= flags;
3150 cfs_spin_unlock(&oap->oap_lock);
3152 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3153 oap->oap_async_flags);
3157 int osc_teardown_async_page(struct obd_export *exp,
3158 struct lov_stripe_md *lsm,
3159 struct lov_oinfo *loi, void *cookie)
3161 struct client_obd *cli = &exp->exp_obd->u.cli;
3162 struct loi_oap_pages *lop;
3163 struct osc_async_page *oap;
3167 oap = oap_from_cookie(cookie);
3169 RETURN(PTR_ERR(oap));
3172 loi = lsm->lsm_oinfo[0];
3174 if (oap->oap_cmd & OBD_BRW_WRITE) {
3175 lop = &loi->loi_write_lop;
3177 lop = &loi->loi_read_lop;
3180 client_obd_list_lock(&cli->cl_loi_list_lock);
3182 if (!cfs_list_empty(&oap->oap_rpc_item))
3183 GOTO(out, rc = -EBUSY);
3185 osc_exit_cache(cli, oap, 0);
3186 osc_wake_cache_waiters(cli);
3188 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3189 cfs_list_del_init(&oap->oap_urgent_item);
3190 cfs_spin_lock(&oap->oap_lock);
3191 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
3193 cfs_spin_unlock(&oap->oap_lock);
3195 if (!cfs_list_empty(&oap->oap_pending_item)) {
3196 cfs_list_del_init(&oap->oap_pending_item);
3197 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3199 loi_list_maint(cli, loi);
3200 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3202 client_obd_list_unlock(&cli->cl_loi_list_lock);
3206 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3207 struct ldlm_enqueue_info *einfo,
3210 void *data = einfo->ei_cbdata;
3212 LASSERT(lock != NULL);
3213 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3214 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3215 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3216 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3218 lock_res_and_lock(lock);
3219 cfs_spin_lock(&osc_ast_guard);
3220 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3221 lock->l_ast_data = data;
3222 cfs_spin_unlock(&osc_ast_guard);
3223 unlock_res_and_lock(lock);
3226 static void osc_set_data_with_check(struct lustre_handle *lockh,
3227 struct ldlm_enqueue_info *einfo,
3230 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3233 osc_set_lock_data_with_check(lock, einfo, flags);
3234 LDLM_LOCK_PUT(lock);
3236 CERROR("lockh %p, data %p - client evicted?\n",
3237 lockh, einfo->ei_cbdata);
3240 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3241 ldlm_iterator_t replace, void *data)
3243 struct ldlm_res_id res_id;
3244 struct obd_device *obd = class_exp2obd(exp);
3246 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3247 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3251 /* find any ldlm lock of the inode in osc
3255 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3256 ldlm_iterator_t replace, void *data)
3258 struct ldlm_res_id res_id;
3259 struct obd_device *obd = class_exp2obd(exp);
3262 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3263 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3264 if (rc == LDLM_ITER_STOP)
3266 if (rc == LDLM_ITER_CONTINUE)
3271 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3272 obd_enqueue_update_f upcall, void *cookie,
3275 int intent = *flags & LDLM_FL_HAS_INTENT;
3279 /* The request was created before ldlm_cli_enqueue call. */
3280 if (rc == ELDLM_LOCK_ABORTED) {
3281 struct ldlm_reply *rep;
3282 rep = req_capsule_server_get(&req->rq_pill,
3285 LASSERT(rep != NULL);
3286 if (rep->lock_policy_res1)
3287 rc = rep->lock_policy_res1;
3291 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3292 *flags |= LDLM_FL_LVB_READY;
3293 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3294 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3297 /* Call the update callback. */
3298 rc = (*upcall)(cookie, rc);
3302 static int osc_enqueue_interpret(const struct lu_env *env,
3303 struct ptlrpc_request *req,
3304 struct osc_enqueue_args *aa, int rc)
3306 struct ldlm_lock *lock;
3307 struct lustre_handle handle;
3310 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3311 * might be freed anytime after lock upcall has been called. */
3312 lustre_handle_copy(&handle, aa->oa_lockh);
3313 mode = aa->oa_ei->ei_mode;
3315 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3317 lock = ldlm_handle2lock(&handle);
3319 /* Take an additional reference so that a blocking AST that
3320 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3321 * to arrive after an upcall has been executed by
3322 * osc_enqueue_fini(). */
3323 ldlm_lock_addref(&handle, mode);
3325 /* Let CP AST to grant the lock first. */
3326 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3328 /* Complete obtaining the lock procedure. */
3329 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3330 mode, aa->oa_flags, aa->oa_lvb,
3331 sizeof(*aa->oa_lvb), &handle, rc);
3332 /* Complete osc stuff. */
3333 rc = osc_enqueue_fini(req, aa->oa_lvb,
3334 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3336 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3338 /* Release the lock for async request. */
3339 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3341 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3342 * not already released by
3343 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3345 ldlm_lock_decref(&handle, mode);
3347 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3348 aa->oa_lockh, req, aa);
3349 ldlm_lock_decref(&handle, mode);
3350 LDLM_LOCK_PUT(lock);
3354 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3355 struct lov_oinfo *loi, int flags,
3356 struct ost_lvb *lvb, __u32 mode, int rc)
3358 if (rc == ELDLM_OK) {
3359 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3362 LASSERT(lock != NULL);
3363 loi->loi_lvb = *lvb;
3364 tmp = loi->loi_lvb.lvb_size;
3365 /* Extend KMS up to the end of this lock and no further
3366 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3367 if (tmp > lock->l_policy_data.l_extent.end)
3368 tmp = lock->l_policy_data.l_extent.end + 1;
3369 if (tmp >= loi->loi_kms) {
3370 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3371 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3372 loi_kms_set(loi, tmp);
3374 LDLM_DEBUG(lock, "lock acquired, setting rss="
3375 LPU64"; leaving kms="LPU64", end="LPU64,
3376 loi->loi_lvb.lvb_size, loi->loi_kms,
3377 lock->l_policy_data.l_extent.end);
3379 ldlm_lock_allow_match(lock);
3380 LDLM_LOCK_PUT(lock);
3381 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3382 loi->loi_lvb = *lvb;
3383 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3384 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3388 EXPORT_SYMBOL(osc_update_enqueue);
3390 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3392 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3393 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3394 * other synchronous requests, however keeping some locks and trying to obtain
3395 * others may take a considerable amount of time in a case of ost failure; and
3396 * when other sync requests do not get released lock from a client, the client
3397 * is excluded from the cluster -- such scenarious make the life difficult, so
3398 * release locks just after they are obtained. */
3399 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3400 int *flags, ldlm_policy_data_t *policy,
3401 struct ost_lvb *lvb, int kms_valid,
3402 obd_enqueue_update_f upcall, void *cookie,
3403 struct ldlm_enqueue_info *einfo,
3404 struct lustre_handle *lockh,
3405 struct ptlrpc_request_set *rqset, int async)
3407 struct obd_device *obd = exp->exp_obd;
3408 struct ptlrpc_request *req = NULL;
3409 int intent = *flags & LDLM_FL_HAS_INTENT;
3414 /* Filesystem lock extents are extended to page boundaries so that
3415 * dealing with the page cache is a little smoother. */
3416 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3417 policy->l_extent.end |= ~CFS_PAGE_MASK;
3420 * kms is not valid when either object is completely fresh (so that no
3421 * locks are cached), or object was evicted. In the latter case cached
3422 * lock cannot be used, because it would prime inode state with
3423 * potentially stale LVB.
3428 /* Next, search for already existing extent locks that will cover us */
3429 /* If we're trying to read, we also search for an existing PW lock. The
3430 * VFS and page cache already protect us locally, so lots of readers/
3431 * writers can share a single PW lock.
3433 * There are problems with conversion deadlocks, so instead of
3434 * converting a read lock to a write lock, we'll just enqueue a new
3437 * At some point we should cancel the read lock instead of making them
3438 * send us a blocking callback, but there are problems with canceling
3439 * locks out from other users right now, too. */
3440 mode = einfo->ei_mode;
3441 if (einfo->ei_mode == LCK_PR)
3443 mode = ldlm_lock_match(obd->obd_namespace,
3444 *flags | LDLM_FL_LVB_READY, res_id,
3445 einfo->ei_type, policy, mode, lockh, 0);
3447 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3449 if (matched->l_ast_data == NULL ||
3450 matched->l_ast_data == einfo->ei_cbdata) {
3451 /* addref the lock only if not async requests and PW
3452 * lock is matched whereas we asked for PR. */
3453 if (!rqset && einfo->ei_mode != mode)
3454 ldlm_lock_addref(lockh, LCK_PR);
3455 osc_set_lock_data_with_check(matched, einfo, *flags);
3457 /* I would like to be able to ASSERT here that
3458 * rss <= kms, but I can't, for reasons which
3459 * are explained in lov_enqueue() */
3462 /* We already have a lock, and it's referenced */
3463 (*upcall)(cookie, ELDLM_OK);
3465 /* For async requests, decref the lock. */
3466 if (einfo->ei_mode != mode)
3467 ldlm_lock_decref(lockh, LCK_PW);
3469 ldlm_lock_decref(lockh, einfo->ei_mode);
3470 LDLM_LOCK_PUT(matched);
3473 ldlm_lock_decref(lockh, mode);
3474 LDLM_LOCK_PUT(matched);
3479 CFS_LIST_HEAD(cancels);
3480 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3481 &RQF_LDLM_ENQUEUE_LVB);
3485 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3487 ptlrpc_request_free(req);
3491 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3493 ptlrpc_request_set_replen(req);
3496 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3497 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3499 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3500 sizeof(*lvb), lockh, async);
3503 struct osc_enqueue_args *aa;
3504 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3505 aa = ptlrpc_req_async_args(req);
3508 aa->oa_flags = flags;
3509 aa->oa_upcall = upcall;
3510 aa->oa_cookie = cookie;
3512 aa->oa_lockh = lockh;
3514 req->rq_interpret_reply =
3515 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3516 if (rqset == PTLRPCD_SET)
3517 ptlrpcd_add_req(req, PSCOPE_OTHER);
3519 ptlrpc_set_add_req(rqset, req);
3520 } else if (intent) {
3521 ptlrpc_req_finished(req);
3526 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3528 ptlrpc_req_finished(req);
3533 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3534 struct ldlm_enqueue_info *einfo,
3535 struct ptlrpc_request_set *rqset)
3537 struct ldlm_res_id res_id;
3541 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3542 oinfo->oi_md->lsm_object_seq, &res_id);
3544 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3545 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3546 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3547 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3548 rqset, rqset != NULL);
3552 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3553 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3554 int *flags, void *data, struct lustre_handle *lockh,
3557 struct obd_device *obd = exp->exp_obd;
3558 int lflags = *flags;
3562 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3565 /* Filesystem lock extents are extended to page boundaries so that
3566 * dealing with the page cache is a little smoother */
3567 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3568 policy->l_extent.end |= ~CFS_PAGE_MASK;
3570 /* Next, search for already existing extent locks that will cover us */
3571 /* If we're trying to read, we also search for an existing PW lock. The
3572 * VFS and page cache already protect us locally, so lots of readers/
3573 * writers can share a single PW lock. */
3577 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3578 res_id, type, policy, rc, lockh, unref);
3581 osc_set_data_with_check(lockh, data, lflags);
3582 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3583 ldlm_lock_addref(lockh, LCK_PR);
3584 ldlm_lock_decref(lockh, LCK_PW);
3591 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3595 if (unlikely(mode == LCK_GROUP))
3596 ldlm_lock_decref_and_cancel(lockh, mode);
3598 ldlm_lock_decref(lockh, mode);
3603 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3604 __u32 mode, struct lustre_handle *lockh)
3607 RETURN(osc_cancel_base(lockh, mode));
3610 static int osc_cancel_unused(struct obd_export *exp,
3611 struct lov_stripe_md *lsm,
3612 ldlm_cancel_flags_t flags,
3615 struct obd_device *obd = class_exp2obd(exp);
3616 struct ldlm_res_id res_id, *resp = NULL;
3619 resp = osc_build_res_name(lsm->lsm_object_id,
3620 lsm->lsm_object_seq, &res_id);
3623 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3626 static int osc_statfs_interpret(const struct lu_env *env,
3627 struct ptlrpc_request *req,
3628 struct osc_async_args *aa, int rc)
3630 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3631 struct obd_statfs *msfs;
3636 /* The request has in fact never been sent
3637 * due to issues at a higher level (LOV).
3638 * Exit immediately since the caller is
3639 * aware of the problem and takes care
3640 * of the clean up */
3643 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3644 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3650 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3652 GOTO(out, rc = -EPROTO);
3655 /* Reinitialize the RDONLY and DEGRADED flags at the client
3656 * on each statfs, so they don't stay set permanently. */
3657 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3659 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3660 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3661 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3662 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3664 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3665 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3666 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3667 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3669 /* Add a bit of hysteresis so this flag isn't continually flapping,
3670 * and ensure that new files don't get extremely fragmented due to
3671 * only a small amount of available space in the filesystem.
3672 * We want to set the NOSPC flag when there is less than ~0.1% free
3673 * and clear it when there is at least ~0.2% free space, so:
3674 * avail < ~0.1% max max = avail + used
3675 * 1025 * avail < avail + used used = blocks - free
3676 * 1024 * avail < used
3677 * 1024 * avail < blocks - free
3678 * avail < ((blocks - free) >> 10)
3680 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3681 * lose that amount of space so in those cases we report no space left
3682 * if their is less than 1 GB left. */
3683 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3684 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3685 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3686 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3687 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3688 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3689 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3691 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3693 *aa->aa_oi->oi_osfs = *msfs;
3695 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3699 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3700 __u64 max_age, struct ptlrpc_request_set *rqset)
3702 struct ptlrpc_request *req;
3703 struct osc_async_args *aa;
3707 /* We could possibly pass max_age in the request (as an absolute
3708 * timestamp or a "seconds.usec ago") so the target can avoid doing
3709 * extra calls into the filesystem if that isn't necessary (e.g.
3710 * during mount that would help a bit). Having relative timestamps
3711 * is not so great if request processing is slow, while absolute
3712 * timestamps are not ideal because they need time synchronization. */
3713 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3717 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3719 ptlrpc_request_free(req);
3722 ptlrpc_request_set_replen(req);
3723 req->rq_request_portal = OST_CREATE_PORTAL;
3724 ptlrpc_at_set_req_timeout(req);
3726 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3727 /* procfs requests not want stat in wait for avoid deadlock */
3728 req->rq_no_resend = 1;
3729 req->rq_no_delay = 1;
3732 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3733 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3734 aa = ptlrpc_req_async_args(req);
3737 ptlrpc_set_add_req(rqset, req);
3741 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3742 __u64 max_age, __u32 flags)
3744 struct obd_statfs *msfs;
3745 struct ptlrpc_request *req;
3746 struct obd_import *imp = NULL;
3750 /*Since the request might also come from lprocfs, so we need
3751 *sync this with client_disconnect_export Bug15684*/
3752 cfs_down_read(&obd->u.cli.cl_sem);
3753 if (obd->u.cli.cl_import)
3754 imp = class_import_get(obd->u.cli.cl_import);
3755 cfs_up_read(&obd->u.cli.cl_sem);
3759 /* We could possibly pass max_age in the request (as an absolute
3760 * timestamp or a "seconds.usec ago") so the target can avoid doing
3761 * extra calls into the filesystem if that isn't necessary (e.g.
3762 * during mount that would help a bit). Having relative timestamps
3763 * is not so great if request processing is slow, while absolute
3764 * timestamps are not ideal because they need time synchronization. */
3765 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3767 class_import_put(imp);
3772 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3774 ptlrpc_request_free(req);
3777 ptlrpc_request_set_replen(req);
3778 req->rq_request_portal = OST_CREATE_PORTAL;
3779 ptlrpc_at_set_req_timeout(req);
3781 if (flags & OBD_STATFS_NODELAY) {
3782 /* procfs requests not want stat in wait for avoid deadlock */
3783 req->rq_no_resend = 1;
3784 req->rq_no_delay = 1;
3787 rc = ptlrpc_queue_wait(req);
3791 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3793 GOTO(out, rc = -EPROTO);
3800 ptlrpc_req_finished(req);
3804 /* Retrieve object striping information.
3806 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3807 * the maximum number of OST indices which will fit in the user buffer.
3808 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3810 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3812 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3813 struct lov_user_md_v3 lum, *lumk;
3814 struct lov_user_ost_data_v1 *lmm_objects;
3815 int rc = 0, lum_size;
3821 /* we only need the header part from user space to get lmm_magic and
3822 * lmm_stripe_count, (the header part is common to v1 and v3) */
3823 lum_size = sizeof(struct lov_user_md_v1);
3824 if (cfs_copy_from_user(&lum, lump, lum_size))
3827 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3828 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3831 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3832 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3833 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3834 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3836 /* we can use lov_mds_md_size() to compute lum_size
3837 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3838 if (lum.lmm_stripe_count > 0) {
3839 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3840 OBD_ALLOC(lumk, lum_size);
3844 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3845 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3847 lmm_objects = &(lumk->lmm_objects[0]);
3848 lmm_objects->l_object_id = lsm->lsm_object_id;
3850 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3854 lumk->lmm_object_id = lsm->lsm_object_id;
3855 lumk->lmm_object_seq = lsm->lsm_object_seq;
3856 lumk->lmm_stripe_count = 1;
3858 if (cfs_copy_to_user(lump, lumk, lum_size))
3862 OBD_FREE(lumk, lum_size);
3868 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3869 void *karg, void *uarg)
3871 struct obd_device *obd = exp->exp_obd;
3872 struct obd_ioctl_data *data = karg;
3876 if (!cfs_try_module_get(THIS_MODULE)) {
3877 CERROR("Can't get module. Is it alive?");
3881 case OBD_IOC_LOV_GET_CONFIG: {
3883 struct lov_desc *desc;
3884 struct obd_uuid uuid;
3888 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3889 GOTO(out, err = -EINVAL);
3891 data = (struct obd_ioctl_data *)buf;
3893 if (sizeof(*desc) > data->ioc_inllen1) {
3894 obd_ioctl_freedata(buf, len);
3895 GOTO(out, err = -EINVAL);
3898 if (data->ioc_inllen2 < sizeof(uuid)) {
3899 obd_ioctl_freedata(buf, len);
3900 GOTO(out, err = -EINVAL);
3903 desc = (struct lov_desc *)data->ioc_inlbuf1;
3904 desc->ld_tgt_count = 1;
3905 desc->ld_active_tgt_count = 1;
3906 desc->ld_default_stripe_count = 1;
3907 desc->ld_default_stripe_size = 0;
3908 desc->ld_default_stripe_offset = 0;
3909 desc->ld_pattern = 0;
3910 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3912 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3914 err = cfs_copy_to_user((void *)uarg, buf, len);
3917 obd_ioctl_freedata(buf, len);
3920 case LL_IOC_LOV_SETSTRIPE:
3921 err = obd_alloc_memmd(exp, karg);
3925 case LL_IOC_LOV_GETSTRIPE:
3926 err = osc_getstripe(karg, uarg);
3928 case OBD_IOC_CLIENT_RECOVER:
3929 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3934 case IOC_OSC_SET_ACTIVE:
3935 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3938 case OBD_IOC_POLL_QUOTACHECK:
3939 err = lquota_poll_check(quota_interface, exp,
3940 (struct if_quotacheck *)karg);
3942 case OBD_IOC_PING_TARGET:
3943 err = ptlrpc_obd_ping(obd);
3946 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3947 cmd, cfs_curproc_comm());
3948 GOTO(out, err = -ENOTTY);
3951 cfs_module_put(THIS_MODULE);
3955 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3956 void *key, __u32 *vallen, void *val,
3957 struct lov_stripe_md *lsm)
3960 if (!vallen || !val)
3963 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3964 __u32 *stripe = val;
3965 *vallen = sizeof(*stripe);
3968 } else if (KEY_IS(KEY_LAST_ID)) {
3969 struct ptlrpc_request *req;
3974 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3975 &RQF_OST_GET_INFO_LAST_ID);
3979 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3980 RCL_CLIENT, keylen);
3981 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3983 ptlrpc_request_free(req);
3987 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3988 memcpy(tmp, key, keylen);
3990 req->rq_no_delay = req->rq_no_resend = 1;
3991 ptlrpc_request_set_replen(req);
3992 rc = ptlrpc_queue_wait(req);
3996 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3998 GOTO(out, rc = -EPROTO);
4000 *((obd_id *)val) = *reply;
4002 ptlrpc_req_finished(req);
4004 } else if (KEY_IS(KEY_FIEMAP)) {
4005 struct ptlrpc_request *req;
4006 struct ll_user_fiemap *reply;
4010 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4011 &RQF_OST_GET_INFO_FIEMAP);
4015 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4016 RCL_CLIENT, keylen);
4017 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4018 RCL_CLIENT, *vallen);
4019 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4020 RCL_SERVER, *vallen);
4022 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4024 ptlrpc_request_free(req);
4028 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4029 memcpy(tmp, key, keylen);
4030 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4031 memcpy(tmp, val, *vallen);
4033 ptlrpc_request_set_replen(req);
4034 rc = ptlrpc_queue_wait(req);
4038 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4040 GOTO(out1, rc = -EPROTO);
4042 memcpy(val, reply, *vallen);
4044 ptlrpc_req_finished(req);
4052 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4054 struct llog_ctxt *ctxt;
4058 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4060 rc = llog_initiator_connect(ctxt);
4061 llog_ctxt_put(ctxt);
4063 /* XXX return an error? skip setting below flags? */
4066 cfs_spin_lock(&imp->imp_lock);
4067 imp->imp_server_timeout = 1;
4068 imp->imp_pingable = 1;
4069 cfs_spin_unlock(&imp->imp_lock);
4070 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4075 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4076 struct ptlrpc_request *req,
4083 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4086 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4087 void *key, obd_count vallen, void *val,
4088 struct ptlrpc_request_set *set)
4090 struct ptlrpc_request *req;
4091 struct obd_device *obd = exp->exp_obd;
4092 struct obd_import *imp = class_exp2cliimp(exp);
4097 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4099 if (KEY_IS(KEY_NEXT_ID)) {
4101 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4103 if (vallen != sizeof(obd_id))
4108 if (vallen != sizeof(obd_id))
4111 /* avoid race between allocate new object and set next id
4112 * from ll_sync thread */
4113 cfs_spin_lock(&oscc->oscc_lock);
4114 new_val = *((obd_id*)val) + 1;
4115 if (new_val > oscc->oscc_next_id)
4116 oscc->oscc_next_id = new_val;
4117 cfs_spin_unlock(&oscc->oscc_lock);
4118 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4119 exp->exp_obd->obd_name,
4120 obd->u.cli.cl_oscc.oscc_next_id);
4125 if (KEY_IS(KEY_CHECKSUM)) {
4126 if (vallen != sizeof(int))
4128 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4132 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4133 sptlrpc_conf_client_adapt(obd);
4137 if (KEY_IS(KEY_FLUSH_CTX)) {
4138 sptlrpc_import_flush_my_ctx(imp);
4142 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4145 /* We pass all other commands directly to OST. Since nobody calls osc
4146 methods directly and everybody is supposed to go through LOV, we
4147 assume lov checked invalid values for us.
4148 The only recognised values so far are evict_by_nid and mds_conn.
4149 Even if something bad goes through, we'd get a -EINVAL from OST
4152 if (KEY_IS(KEY_GRANT_SHRINK))
4153 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4155 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4160 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4161 RCL_CLIENT, keylen);
4162 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4163 RCL_CLIENT, vallen);
4164 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4166 ptlrpc_request_free(req);
4170 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4171 memcpy(tmp, key, keylen);
4172 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4173 memcpy(tmp, val, vallen);
4175 if (KEY_IS(KEY_MDS_CONN)) {
4176 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4178 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4179 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4180 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4181 req->rq_no_delay = req->rq_no_resend = 1;
4182 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4183 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4184 struct osc_grant_args *aa;
4187 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4188 aa = ptlrpc_req_async_args(req);
4191 ptlrpc_req_finished(req);
4194 *oa = ((struct ost_body *)val)->oa;
4196 req->rq_interpret_reply = osc_shrink_grant_interpret;
4199 ptlrpc_request_set_replen(req);
4200 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4201 LASSERT(set != NULL);
4202 ptlrpc_set_add_req(set, req);
4203 ptlrpc_check_set(NULL, set);
4205 ptlrpcd_add_req(req, PSCOPE_OTHER);
4211 static struct llog_operations osc_size_repl_logops = {
4212 lop_cancel: llog_obd_repl_cancel
4215 static struct llog_operations osc_mds_ost_orig_logops;
4217 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4218 struct obd_device *tgt, struct llog_catid *catid)
4223 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4224 &catid->lci_logid, &osc_mds_ost_orig_logops);
4226 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4230 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4231 NULL, &osc_size_repl_logops);
4233 struct llog_ctxt *ctxt =
4234 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4237 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4242 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4243 obd->obd_name, tgt->obd_name, catid, rc);
4244 CERROR("logid "LPX64":0x%x\n",
4245 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4250 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4251 struct obd_device *disk_obd, int *index)
4253 struct llog_catid catid;
4254 static char name[32] = CATLIST;
4258 LASSERT(olg == &obd->obd_olg);
4260 cfs_mutex_down(&olg->olg_cat_processing);
4261 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4263 CERROR("rc: %d\n", rc);
4267 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4268 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4269 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4271 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4273 CERROR("rc: %d\n", rc);
4277 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4279 CERROR("rc: %d\n", rc);
4284 cfs_mutex_up(&olg->olg_cat_processing);
4289 static int osc_llog_finish(struct obd_device *obd, int count)
4291 struct llog_ctxt *ctxt;
4292 int rc = 0, rc2 = 0;
4295 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4297 rc = llog_cleanup(ctxt);
4299 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4301 rc2 = llog_cleanup(ctxt);
4308 static int osc_reconnect(const struct lu_env *env,
4309 struct obd_export *exp, struct obd_device *obd,
4310 struct obd_uuid *cluuid,
4311 struct obd_connect_data *data,
4314 struct client_obd *cli = &obd->u.cli;
4316 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4319 client_obd_list_lock(&cli->cl_loi_list_lock);
4320 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4321 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4322 lost_grant = cli->cl_lost_grant;
4323 cli->cl_lost_grant = 0;
4324 client_obd_list_unlock(&cli->cl_loi_list_lock);
4326 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4327 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4328 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4329 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4330 " ocd_grant: %d\n", data->ocd_connect_flags,
4331 data->ocd_version, data->ocd_grant);
4337 static int osc_disconnect(struct obd_export *exp)
4339 struct obd_device *obd = class_exp2obd(exp);
4340 struct llog_ctxt *ctxt;
4343 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4345 if (obd->u.cli.cl_conn_count == 1) {
4346 /* Flush any remaining cancel messages out to the
4348 llog_sync(ctxt, exp);
4350 llog_ctxt_put(ctxt);
4352 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4356 rc = client_disconnect_export(exp);
4358 * Initially we put del_shrink_grant before disconnect_export, but it
4359 * causes the following problem if setup (connect) and cleanup
4360 * (disconnect) are tangled together.
4361 * connect p1 disconnect p2
4362 * ptlrpc_connect_import
4363 * ............... class_manual_cleanup
4366 * ptlrpc_connect_interrupt
4368 * add this client to shrink list
4370 * Bang! pinger trigger the shrink.
4371 * So the osc should be disconnected from the shrink list, after we
4372 * are sure the import has been destroyed. BUG18662
4374 if (obd->u.cli.cl_import == NULL)
4375 osc_del_shrink_grant(&obd->u.cli);
4379 static int osc_import_event(struct obd_device *obd,
4380 struct obd_import *imp,
4381 enum obd_import_event event)
4383 struct client_obd *cli;
4387 LASSERT(imp->imp_obd == obd);
4390 case IMP_EVENT_DISCON: {
4391 /* Only do this on the MDS OSC's */
4392 if (imp->imp_server_timeout) {
4393 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4395 cfs_spin_lock(&oscc->oscc_lock);
4396 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4397 cfs_spin_unlock(&oscc->oscc_lock);
4400 client_obd_list_lock(&cli->cl_loi_list_lock);
4401 cli->cl_avail_grant = 0;
4402 cli->cl_lost_grant = 0;
4403 client_obd_list_unlock(&cli->cl_loi_list_lock);
4406 case IMP_EVENT_INACTIVE: {
4407 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4410 case IMP_EVENT_INVALIDATE: {
4411 struct ldlm_namespace *ns = obd->obd_namespace;
4415 env = cl_env_get(&refcheck);
4419 client_obd_list_lock(&cli->cl_loi_list_lock);
4420 /* all pages go to failing rpcs due to the invalid
4422 osc_check_rpcs(env, cli);
4423 client_obd_list_unlock(&cli->cl_loi_list_lock);
4425 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4426 cl_env_put(env, &refcheck);
4431 case IMP_EVENT_ACTIVE: {
4432 /* Only do this on the MDS OSC's */
4433 if (imp->imp_server_timeout) {
4434 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4436 cfs_spin_lock(&oscc->oscc_lock);
4437 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4438 cfs_spin_unlock(&oscc->oscc_lock);
4440 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4443 case IMP_EVENT_OCD: {
4444 struct obd_connect_data *ocd = &imp->imp_connect_data;
4446 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4447 osc_init_grant(&obd->u.cli, ocd);
4450 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4451 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4453 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4457 CERROR("Unknown import event %d\n", event);
4464 * Determine whether the lock can be canceled before replaying the lock
4465 * during recovery, see bug16774 for detailed information.
4467 * \retval zero the lock can't be canceled
4468 * \retval other ok to cancel
4470 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4472 check_res_locked(lock->l_resource);
4475 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4477 * XXX as a future improvement, we can also cancel unused write lock
4478 * if it doesn't have dirty data and active mmaps.
4480 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4481 (lock->l_granted_mode == LCK_PR ||
4482 lock->l_granted_mode == LCK_CR) &&
4483 (osc_dlm_lock_pageref(lock) == 0))
4489 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4495 rc = ptlrpcd_addref();
4499 rc = client_obd_setup(obd, lcfg);
4503 struct lprocfs_static_vars lvars = { 0 };
4504 struct client_obd *cli = &obd->u.cli;
4506 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4507 lprocfs_osc_init_vars(&lvars);
4508 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4509 lproc_osc_attach_seqstat(obd);
4510 sptlrpc_lprocfs_cliobd_attach(obd);
4511 ptlrpc_lprocfs_register_obd(obd);
4515 /* We need to allocate a few requests more, because
4516 brw_interpret tries to create new requests before freeing
4517 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4518 reserved, but I afraid that might be too much wasted RAM
4519 in fact, so 2 is just my guess and still should work. */
4520 cli->cl_import->imp_rq_pool =
4521 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4523 ptlrpc_add_rqs_to_pool);
4525 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4526 cfs_sema_init(&cli->cl_grant_sem, 1);
4528 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4534 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4540 case OBD_CLEANUP_EARLY: {
4541 struct obd_import *imp;
4542 imp = obd->u.cli.cl_import;
4543 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4544 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4545 ptlrpc_deactivate_import(imp);
4546 cfs_spin_lock(&imp->imp_lock);
4547 imp->imp_pingable = 0;
4548 cfs_spin_unlock(&imp->imp_lock);
4551 case OBD_CLEANUP_EXPORTS: {
4552 /* If we set up but never connected, the
4553 client import will not have been cleaned. */
4554 if (obd->u.cli.cl_import) {
4555 struct obd_import *imp;
4556 cfs_down_write(&obd->u.cli.cl_sem);
4557 imp = obd->u.cli.cl_import;
4558 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4560 ptlrpc_invalidate_import(imp);
4561 if (imp->imp_rq_pool) {
4562 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4563 imp->imp_rq_pool = NULL;
4565 class_destroy_import(imp);
4566 cfs_up_write(&obd->u.cli.cl_sem);
4567 obd->u.cli.cl_import = NULL;
4569 rc = obd_llog_finish(obd, 0);
4571 CERROR("failed to cleanup llogging subsystems\n");
4578 int osc_cleanup(struct obd_device *obd)
4583 ptlrpc_lprocfs_unregister_obd(obd);
4584 lprocfs_obd_cleanup(obd);
4586 /* free memory of osc quota cache */
4587 lquota_cleanup(quota_interface, obd);
4589 rc = client_obd_cleanup(obd);
4595 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4597 struct lprocfs_static_vars lvars = { 0 };
4600 lprocfs_osc_init_vars(&lvars);
4602 switch (lcfg->lcfg_command) {
4604 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4614 static int osc_sync_fs(struct obd_export *exp, struct obd_info *oinfo,
4617 struct obd_device *obd = class_exp2obd(exp);
4618 struct client_obd *cli;
4619 struct lov_oinfo *loi;
4620 struct lov_oinfo *tloi;
4621 struct osc_async_page *oap;
4622 struct osc_async_page *toap;
4623 struct loi_oap_pages *lop;
4629 env = cl_env_get(&refcheck);
4631 RETURN(PTR_ERR(env));
4634 client_obd_list_lock(&cli->cl_loi_list_lock);
4635 cli->cl_sf_wait.sfw_oi = oinfo;
4636 cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
4637 cli->cl_sf_wait.started = 1;
4638 /* creating cl_loi_sync_fs list */
4639 cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
4641 lop = &loi->loi_write_lop;
4642 cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
4644 osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
4646 osc_check_rpcs(env, cli);
4647 osc_wake_sync_fs(cli);
4648 client_obd_list_unlock(&cli->cl_loi_list_lock);
4649 cl_env_put(env, &refcheck);
4654 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4656 return osc_process_config_base(obd, buf);
4659 struct obd_ops osc_obd_ops = {
4660 .o_owner = THIS_MODULE,
4661 .o_setup = osc_setup,
4662 .o_precleanup = osc_precleanup,
4663 .o_cleanup = osc_cleanup,
4664 .o_add_conn = client_import_add_conn,
4665 .o_del_conn = client_import_del_conn,
4666 .o_connect = client_connect_import,
4667 .o_reconnect = osc_reconnect,
4668 .o_disconnect = osc_disconnect,
4669 .o_statfs = osc_statfs,
4670 .o_statfs_async = osc_statfs_async,
4671 .o_packmd = osc_packmd,
4672 .o_unpackmd = osc_unpackmd,
4673 .o_precreate = osc_precreate,
4674 .o_create = osc_create,
4675 .o_create_async = osc_create_async,
4676 .o_destroy = osc_destroy,
4677 .o_getattr = osc_getattr,
4678 .o_getattr_async = osc_getattr_async,
4679 .o_setattr = osc_setattr,
4680 .o_setattr_async = osc_setattr_async,
4682 .o_punch = osc_punch,
4684 .o_enqueue = osc_enqueue,
4685 .o_change_cbdata = osc_change_cbdata,
4686 .o_find_cbdata = osc_find_cbdata,
4687 .o_cancel = osc_cancel,
4688 .o_cancel_unused = osc_cancel_unused,
4689 .o_iocontrol = osc_iocontrol,
4690 .o_get_info = osc_get_info,
4691 .o_set_info_async = osc_set_info_async,
4692 .o_import_event = osc_import_event,
4693 .o_llog_init = osc_llog_init,
4694 .o_llog_finish = osc_llog_finish,
4695 .o_process_config = osc_process_config,
4696 .o_sync_fs = osc_sync_fs,
4699 extern struct lu_kmem_descr osc_caches[];
4700 extern cfs_spinlock_t osc_ast_guard;
4701 extern cfs_lock_class_key_t osc_ast_guard_class;
4703 int __init osc_init(void)
4705 struct lprocfs_static_vars lvars = { 0 };
4709 /* print an address of _any_ initialized kernel symbol from this
4710 * module, to allow debugging with gdb that doesn't support data
4711 * symbols from modules.*/
4712 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4714 rc = lu_kmem_init(osc_caches);
4716 lprocfs_osc_init_vars(&lvars);
4718 cfs_request_module("lquota");
4719 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4720 lquota_init(quota_interface);
4721 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4723 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4724 LUSTRE_OSC_NAME, &osc_device_type);
4726 if (quota_interface)
4727 PORTAL_SYMBOL_PUT(osc_quota_interface);
4728 lu_kmem_fini(osc_caches);
4732 cfs_spin_lock_init(&osc_ast_guard);
4733 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4735 osc_mds_ost_orig_logops = llog_lvfs_ops;
4736 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4737 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4738 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4739 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4745 static void /*__exit*/ osc_exit(void)
4747 lu_device_type_fini(&osc_device_type);
4749 lquota_exit(quota_interface);
4750 if (quota_interface)
4751 PORTAL_SYMBOL_PUT(osc_quota_interface);
4753 class_unregister_type(LUSTRE_OSC_NAME);
4754 lu_kmem_fini(osc_caches);
4757 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4758 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4759 MODULE_LICENSE("GPL");
4761 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);