4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 static int brw_interpret(const struct lu_env *env,
68 struct ptlrpc_request *req, void *data, int rc);
69 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
71 int osc_cleanup(struct obd_device *obd);
73 /* Pack OSC object metadata for disk storage (LE byte order). */
74 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
75 struct lov_stripe_md *lsm)
80 lmm_size = sizeof(**lmmp);
85 OBD_FREE(*lmmp, lmm_size);
91 OBD_ALLOC(*lmmp, lmm_size);
97 LASSERT(lsm->lsm_object_id);
98 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
99 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
106 /* Unpack OSC object metadata from disk storage (LE byte order). */
107 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
108 struct lov_mds_md *lmm, int lmm_bytes)
111 struct obd_import *imp = class_exp2cliimp(exp);
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
160 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
161 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
163 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
168 static inline void osc_pack_capa(struct ptlrpc_request *req,
169 struct ost_body *body, void *capa)
171 struct obd_capa *oc = (struct obd_capa *)capa;
172 struct lustre_capa *c;
177 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
180 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
181 DEBUG_CAPA(D_SEC, c, "pack");
184 static inline void osc_pack_req_body(struct ptlrpc_request *req,
185 struct obd_info *oinfo)
187 struct ost_body *body;
189 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
192 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
193 osc_pack_capa(req, body, oinfo->oi_capa);
196 static inline void osc_set_capa_size(struct ptlrpc_request *req,
197 const struct req_msg_field *field,
201 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
203 /* it is already calculated as sizeof struct obd_capa */
207 static int osc_getattr_interpret(const struct lu_env *env,
208 struct ptlrpc_request *req,
209 struct osc_async_args *aa, int rc)
211 struct ost_body *body;
217 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
219 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
220 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
222 /* This should really be sent by the OST */
223 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
224 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
226 CDEBUG(D_INFO, "can't unpack ost_body\n");
228 aa->aa_oi->oi_oa->o_valid = 0;
231 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
235 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
236 struct ptlrpc_request_set *set)
238 struct ptlrpc_request *req;
239 struct osc_async_args *aa;
243 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
247 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
248 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
250 ptlrpc_request_free(req);
254 osc_pack_req_body(req, oinfo);
256 ptlrpc_request_set_replen(req);
257 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
259 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
260 aa = ptlrpc_req_async_args(req);
263 ptlrpc_set_add_req(set, req);
267 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
269 struct ptlrpc_request *req;
270 struct ost_body *body;
274 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
278 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
281 ptlrpc_request_free(req);
285 osc_pack_req_body(req, oinfo);
287 ptlrpc_request_set_replen(req);
289 rc = ptlrpc_queue_wait(req);
293 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
295 GOTO(out, rc = -EPROTO);
297 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
300 /* This should really be sent by the OST */
301 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
302 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
306 ptlrpc_req_finished(req);
310 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
311 struct obd_trans_info *oti)
313 struct ptlrpc_request *req;
314 struct ost_body *body;
318 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327 ptlrpc_request_free(req);
331 osc_pack_req_body(req, oinfo);
333 ptlrpc_request_set_replen(req);
335 rc = ptlrpc_queue_wait(req);
339 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EPROTO);
343 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
347 ptlrpc_req_finished(req);
351 static int osc_setattr_interpret(const struct lu_env *env,
352 struct ptlrpc_request *req,
353 struct osc_setattr_args *sa, int rc)
355 struct ost_body *body;
361 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363 GOTO(out, rc = -EPROTO);
365 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
367 rc = sa->sa_upcall(sa->sa_cookie, rc);
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372 struct obd_trans_info *oti,
373 obd_enqueue_update_f upcall, void *cookie,
374 struct ptlrpc_request_set *rqset)
376 struct ptlrpc_request *req;
377 struct osc_setattr_args *sa;
381 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
385 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
388 ptlrpc_request_free(req);
392 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
395 osc_pack_req_body(req, oinfo);
397 ptlrpc_request_set_replen(req);
399 /* do mds to ost setattr asynchronously */
401 /* Do not wait for response. */
402 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
404 req->rq_interpret_reply =
405 (ptlrpc_interpterer_t)osc_setattr_interpret;
407 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408 sa = ptlrpc_req_async_args(req);
409 sa->sa_oa = oinfo->oi_oa;
410 sa->sa_upcall = upcall;
411 sa->sa_cookie = cookie;
413 if (rqset == PTLRPCD_SET)
414 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
416 ptlrpc_set_add_req(rqset, req);
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423 struct obd_trans_info *oti,
424 struct ptlrpc_request_set *rqset)
426 return osc_setattr_async_base(exp, oinfo, oti,
427 oinfo->oi_cb_up, oinfo, rqset);
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431 struct lov_stripe_md **ea, struct obd_trans_info *oti)
433 struct ptlrpc_request *req;
434 struct ost_body *body;
435 struct lov_stripe_md *lsm;
444 rc = obd_alloc_memmd(exp, &lsm);
449 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
451 GOTO(out, rc = -ENOMEM);
453 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
455 ptlrpc_request_free(req);
459 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
461 lustre_set_wire_obdo(&body->oa, oa);
463 ptlrpc_request_set_replen(req);
465 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
466 oa->o_flags == OBD_FL_DELORPHAN) {
468 "delorphan from OST integration");
469 /* Don't resend the delorphan req */
470 req->rq_no_resend = req->rq_no_delay = 1;
473 rc = ptlrpc_queue_wait(req);
477 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479 GOTO(out_req, rc = -EPROTO);
481 lustre_get_wire_obdo(oa, &body->oa);
483 /* This should really be sent by the OST */
484 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
485 oa->o_valid |= OBD_MD_FLBLKSZ;
487 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
488 * have valid lsm_oinfo data structs, so don't go touching that.
489 * This needs to be fixed in a big way.
491 lsm->lsm_object_id = oa->o_id;
492 lsm->lsm_object_seq = oa->o_seq;
496 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
498 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499 if (!oti->oti_logcookies)
500 oti_alloc_cookies(oti, 1);
501 *oti->oti_logcookies = oa->o_lcookie;
505 CDEBUG(D_HA, "transno: "LPD64"\n",
506 lustre_msg_get_transno(req->rq_repmsg));
508 ptlrpc_req_finished(req);
511 obd_free_memmd(exp, &lsm);
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516 obd_enqueue_update_f upcall, void *cookie,
517 struct ptlrpc_request_set *rqset)
519 struct ptlrpc_request *req;
520 struct osc_setattr_args *sa;
521 struct ost_body *body;
525 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532 ptlrpc_request_free(req);
535 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536 ptlrpc_at_set_req_timeout(req);
538 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
541 osc_pack_capa(req, body, oinfo->oi_capa);
543 ptlrpc_request_set_replen(req);
546 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548 sa = ptlrpc_req_async_args(req);
549 sa->sa_oa = oinfo->oi_oa;
550 sa->sa_upcall = upcall;
551 sa->sa_cookie = cookie;
552 if (rqset == PTLRPCD_SET)
553 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555 ptlrpc_set_add_req(rqset, req);
560 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
561 struct obd_trans_info *oti,
562 struct ptlrpc_request_set *rqset)
564 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
565 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567 return osc_punch_base(exp, oinfo,
568 oinfo->oi_cb_up, oinfo, rqset);
571 static int osc_sync_interpret(const struct lu_env *env,
572 struct ptlrpc_request *req,
575 struct osc_async_args *aa = arg;
576 struct ost_body *body;
582 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
584 CERROR ("can't unpack ost_body\n");
585 GOTO(out, rc = -EPROTO);
588 *aa->aa_oi->oi_oa = body->oa;
590 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
594 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
595 obd_size start, obd_size end,
596 struct ptlrpc_request_set *set)
598 struct ptlrpc_request *req;
599 struct ost_body *body;
600 struct osc_async_args *aa;
605 CDEBUG(D_INFO, "oa NULL\n");
609 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
613 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
614 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
616 ptlrpc_request_free(req);
620 /* overload the size and blocks fields in the oa with start/end */
621 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
623 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
624 body->oa.o_size = start;
625 body->oa.o_blocks = end;
626 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
627 osc_pack_capa(req, body, oinfo->oi_capa);
629 ptlrpc_request_set_replen(req);
630 req->rq_interpret_reply = osc_sync_interpret;
632 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
633 aa = ptlrpc_req_async_args(req);
636 ptlrpc_set_add_req(set, req);
640 /* Find and cancel locally locks matched by @mode in the resource found by
641 * @objid. Found locks are added into @cancel list. Returns the amount of
642 * locks added to @cancels list. */
643 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
645 ldlm_mode_t mode, int lock_flags)
647 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
648 struct ldlm_res_id res_id;
649 struct ldlm_resource *res;
653 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
654 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
658 LDLM_RESOURCE_ADDREF(res);
659 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
660 lock_flags, 0, NULL);
661 LDLM_RESOURCE_DELREF(res);
662 ldlm_resource_putref(res);
666 static int osc_destroy_interpret(const struct lu_env *env,
667 struct ptlrpc_request *req, void *data,
670 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672 cfs_atomic_dec(&cli->cl_destroy_in_flight);
673 cfs_waitq_signal(&cli->cl_destroy_waitq);
677 static int osc_can_send_destroy(struct client_obd *cli)
679 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
680 cli->cl_max_rpcs_in_flight) {
681 /* The destroy request can be sent */
684 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
685 cli->cl_max_rpcs_in_flight) {
687 * The counter has been modified between the two atomic
690 cfs_waitq_signal(&cli->cl_destroy_waitq);
695 /* Destroy requests can be async always on the client, and we don't even really
696 * care about the return code since the client cannot do anything at all about
698 * When the MDS is unlinking a filename, it saves the file objects into a
699 * recovery llog, and these object records are cancelled when the OST reports
700 * they were destroyed and sync'd to disk (i.e. transaction committed).
701 * If the client dies, or the OST is down when the object should be destroyed,
702 * the records are not cancelled, and when the OST reconnects to the MDS next,
703 * it will retrieve the llog unlink logs and then sends the log cancellation
704 * cookies to the MDS after committing destroy transactions. */
705 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
706 struct lov_stripe_md *ea, struct obd_trans_info *oti,
707 struct obd_export *md_export, void *capa)
709 struct client_obd *cli = &exp->exp_obd->u.cli;
710 struct ptlrpc_request *req;
711 struct ost_body *body;
712 CFS_LIST_HEAD(cancels);
717 CDEBUG(D_INFO, "oa NULL\n");
721 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
722 LDLM_FL_DISCARD_DATA);
724 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
726 ldlm_lock_list_put(&cancels, l_bl_ast, count);
730 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
731 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
734 ptlrpc_request_free(req);
738 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
739 ptlrpc_at_set_req_timeout(req);
741 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
742 oa->o_lcookie = *oti->oti_logcookies;
743 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
745 lustre_set_wire_obdo(&body->oa, oa);
747 osc_pack_capa(req, body, (struct obd_capa *)capa);
748 ptlrpc_request_set_replen(req);
750 /* don't throttle destroy RPCs for the MDT */
751 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
752 req->rq_interpret_reply = osc_destroy_interpret;
753 if (!osc_can_send_destroy(cli)) {
754 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
758 * Wait until the number of on-going destroy RPCs drops
759 * under max_rpc_in_flight
761 l_wait_event_exclusive(cli->cl_destroy_waitq,
762 osc_can_send_destroy(cli), &lwi);
766 /* Do not wait for response */
767 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
771 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
774 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
776 LASSERT(!(oa->o_valid & bits));
779 client_obd_list_lock(&cli->cl_loi_list_lock);
780 oa->o_dirty = cli->cl_dirty;
781 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
782 CERROR("dirty %lu - %lu > dirty_max %lu\n",
783 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
785 } else if (cfs_atomic_read(&obd_dirty_pages) -
786 cfs_atomic_read(&obd_dirty_transit_pages) >
787 obd_max_dirty_pages + 1){
788 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
789 * not covered by a lock thus they may safely race and trip
790 * this CERROR() unless we add in a small fudge factor (+1). */
791 CERROR("dirty %d - %d > system dirty_max %d\n",
792 cfs_atomic_read(&obd_dirty_pages),
793 cfs_atomic_read(&obd_dirty_transit_pages),
794 obd_max_dirty_pages);
796 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
797 CERROR("dirty %lu - dirty_max %lu too big???\n",
798 cli->cl_dirty, cli->cl_dirty_max);
801 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
802 (cli->cl_max_rpcs_in_flight + 1);
803 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
805 oa->o_grant = cli->cl_avail_grant;
806 oa->o_dropped = cli->cl_lost_grant;
807 cli->cl_lost_grant = 0;
808 client_obd_list_unlock(&cli->cl_loi_list_lock);
809 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
810 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
814 static void osc_update_next_shrink(struct client_obd *cli)
816 cli->cl_next_shrink_grant =
817 cfs_time_shift(cli->cl_grant_shrink_interval);
818 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
819 cli->cl_next_shrink_grant);
822 /* caller must hold loi_list_lock */
823 static void osc_consume_write_grant(struct client_obd *cli,
824 struct brw_page *pga)
826 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
827 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
828 cfs_atomic_inc(&obd_dirty_pages);
829 cli->cl_dirty += CFS_PAGE_SIZE;
830 cli->cl_avail_grant -= CFS_PAGE_SIZE;
831 pga->flag |= OBD_BRW_FROM_GRANT;
832 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
833 CFS_PAGE_SIZE, pga, pga->pg);
834 LASSERT(cli->cl_avail_grant >= 0);
835 osc_update_next_shrink(cli);
838 /* the companion to osc_consume_write_grant, called when a brw has completed.
839 * must be called with the loi lock held. */
840 static void osc_release_write_grant(struct client_obd *cli,
841 struct brw_page *pga, int sent)
843 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
846 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
847 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
852 pga->flag &= ~OBD_BRW_FROM_GRANT;
853 cfs_atomic_dec(&obd_dirty_pages);
854 cli->cl_dirty -= CFS_PAGE_SIZE;
855 if (pga->flag & OBD_BRW_NOCACHE) {
856 pga->flag &= ~OBD_BRW_NOCACHE;
857 cfs_atomic_dec(&obd_dirty_transit_pages);
858 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
861 /* Reclaim grant from truncated pages. This is used to solve
862 * write-truncate and grant all gone(to lost_grant) problem.
863 * For a vfs write this problem can be easily solved by a sync
864 * write, however, this is not an option for page_mkwrite()
865 * because grant has to be allocated before a page becomes
867 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
868 cli->cl_avail_grant += CFS_PAGE_SIZE;
870 cli->cl_lost_grant += CFS_PAGE_SIZE;
871 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
872 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
873 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
874 /* For short writes we shouldn't count parts of pages that
875 * span a whole block on the OST side, or our accounting goes
876 * wrong. Should match the code in filter_grant_check. */
877 int offset = pga->off & ~CFS_PAGE_MASK;
878 int count = pga->count + (offset & (blocksize - 1));
879 int end = (offset + pga->count) & (blocksize - 1);
881 count += blocksize - end;
883 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
884 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
885 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
886 cli->cl_avail_grant, cli->cl_dirty);
892 static unsigned long rpcs_in_flight(struct client_obd *cli)
894 return cli->cl_r_in_flight + cli->cl_w_in_flight;
897 /* caller must hold loi_list_lock */
898 void osc_wake_cache_waiters(struct client_obd *cli)
901 struct osc_cache_waiter *ocw;
904 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
905 /* if we can't dirty more, we must wait until some is written */
906 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
907 (cfs_atomic_read(&obd_dirty_pages) + 1 >
908 obd_max_dirty_pages)) {
909 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
910 "osc max %ld, sys max %d\n", cli->cl_dirty,
911 cli->cl_dirty_max, obd_max_dirty_pages);
915 /* if still dirty cache but no grant wait for pending RPCs that
916 * may yet return us some grant before doing sync writes */
917 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
918 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
919 cli->cl_w_in_flight);
923 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
924 cfs_list_del_init(&ocw->ocw_entry);
925 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
926 /* no more RPCs in flight to return grant, do sync IO */
927 ocw->ocw_rc = -EDQUOT;
928 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
930 osc_consume_write_grant(cli,
931 &ocw->ocw_oap->oap_brw_page);
934 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
935 ocw, ocw->ocw_oap, cli->cl_avail_grant);
937 cfs_waitq_signal(&ocw->ocw_waitq);
943 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
945 client_obd_list_lock(&cli->cl_loi_list_lock);
946 cli->cl_avail_grant += grant;
947 client_obd_list_unlock(&cli->cl_loi_list_lock);
950 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
952 if (body->oa.o_valid & OBD_MD_FLGRANT) {
953 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
954 __osc_update_grant(cli, body->oa.o_grant);
958 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
959 void *key, obd_count vallen, void *val,
960 struct ptlrpc_request_set *set);
962 static int osc_shrink_grant_interpret(const struct lu_env *env,
963 struct ptlrpc_request *req,
966 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
967 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
968 struct ost_body *body;
971 __osc_update_grant(cli, oa->o_grant);
975 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
977 osc_update_grant(cli, body);
983 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
985 client_obd_list_lock(&cli->cl_loi_list_lock);
986 oa->o_grant = cli->cl_avail_grant / 4;
987 cli->cl_avail_grant -= oa->o_grant;
988 client_obd_list_unlock(&cli->cl_loi_list_lock);
989 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
990 oa->o_valid |= OBD_MD_FLFLAGS;
993 oa->o_flags |= OBD_FL_SHRINK_GRANT;
994 osc_update_next_shrink(cli);
997 /* Shrink the current grant, either from some large amount to enough for a
998 * full set of in-flight RPCs, or if we have already shrunk to that limit
999 * then to enough for a single RPC. This avoids keeping more grant than
1000 * needed, and avoids shrinking the grant piecemeal. */
1001 static int osc_shrink_grant(struct client_obd *cli)
1003 long target = (cli->cl_max_rpcs_in_flight + 1) *
1004 cli->cl_max_pages_per_rpc;
1006 client_obd_list_lock(&cli->cl_loi_list_lock);
1007 if (cli->cl_avail_grant <= target)
1008 target = cli->cl_max_pages_per_rpc;
1009 client_obd_list_unlock(&cli->cl_loi_list_lock);
1011 return osc_shrink_grant_to_target(cli, target);
1014 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1017 struct ost_body *body;
1020 client_obd_list_lock(&cli->cl_loi_list_lock);
1021 /* Don't shrink if we are already above or below the desired limit
1022 * We don't want to shrink below a single RPC, as that will negatively
1023 * impact block allocation and long-term performance. */
1024 if (target < cli->cl_max_pages_per_rpc)
1025 target = cli->cl_max_pages_per_rpc;
1027 if (target >= cli->cl_avail_grant) {
1028 client_obd_list_unlock(&cli->cl_loi_list_lock);
1031 client_obd_list_unlock(&cli->cl_loi_list_lock);
1033 OBD_ALLOC_PTR(body);
1037 osc_announce_cached(cli, &body->oa, 0);
1039 client_obd_list_lock(&cli->cl_loi_list_lock);
1040 body->oa.o_grant = cli->cl_avail_grant - target;
1041 cli->cl_avail_grant = target;
1042 client_obd_list_unlock(&cli->cl_loi_list_lock);
1043 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1044 body->oa.o_valid |= OBD_MD_FLFLAGS;
1045 body->oa.o_flags = 0;
1047 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1048 osc_update_next_shrink(cli);
1050 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1051 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1052 sizeof(*body), body, NULL);
1054 __osc_update_grant(cli, body->oa.o_grant);
1059 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1060 static int osc_should_shrink_grant(struct client_obd *client)
1062 cfs_time_t time = cfs_time_current();
1063 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1065 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1066 OBD_CONNECT_GRANT_SHRINK) == 0)
1069 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1070 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1071 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1074 osc_update_next_shrink(client);
1079 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1081 struct client_obd *client;
1083 cfs_list_for_each_entry(client, &item->ti_obd_list,
1084 cl_grant_shrink_list) {
1085 if (osc_should_shrink_grant(client))
1086 osc_shrink_grant(client);
1091 static int osc_add_shrink_grant(struct client_obd *client)
1095 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1097 osc_grant_shrink_grant_cb, NULL,
1098 &client->cl_grant_shrink_list);
1100 CERROR("add grant client %s error %d\n",
1101 client->cl_import->imp_obd->obd_name, rc);
1104 CDEBUG(D_CACHE, "add grant client %s \n",
1105 client->cl_import->imp_obd->obd_name);
1106 osc_update_next_shrink(client);
1110 static int osc_del_shrink_grant(struct client_obd *client)
1112 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1116 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1119 * ocd_grant is the total grant amount we're expect to hold: if we've
1120 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1121 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1123 * race is tolerable here: if we're evicted, but imp_state already
1124 * left EVICTED state, then cl_dirty must be 0 already.
1126 client_obd_list_lock(&cli->cl_loi_list_lock);
1127 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1128 cli->cl_avail_grant = ocd->ocd_grant;
1130 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1132 if (cli->cl_avail_grant < 0) {
1133 CWARN("%s: available grant < 0, the OSS is probably not running"
1134 " with patch from bug20278 (%ld) \n",
1135 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1136 /* workaround for 1.6 servers which do not have
1137 * the patch from bug20278 */
1138 cli->cl_avail_grant = ocd->ocd_grant;
1141 client_obd_list_unlock(&cli->cl_loi_list_lock);
1143 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1144 cli->cl_import->imp_obd->obd_name,
1145 cli->cl_avail_grant, cli->cl_lost_grant);
1147 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1148 cfs_list_empty(&cli->cl_grant_shrink_list))
1149 osc_add_shrink_grant(cli);
1152 /* We assume that the reason this OSC got a short read is because it read
1153 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1154 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1155 * this stripe never got written at or beyond this stripe offset yet. */
1156 static void handle_short_read(int nob_read, obd_count page_count,
1157 struct brw_page **pga)
1162 /* skip bytes read OK */
1163 while (nob_read > 0) {
1164 LASSERT (page_count > 0);
1166 if (pga[i]->count > nob_read) {
1167 /* EOF inside this page */
1168 ptr = cfs_kmap(pga[i]->pg) +
1169 (pga[i]->off & ~CFS_PAGE_MASK);
1170 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1171 cfs_kunmap(pga[i]->pg);
1177 nob_read -= pga[i]->count;
1182 /* zero remaining pages */
1183 while (page_count-- > 0) {
1184 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1185 memset(ptr, 0, pga[i]->count);
1186 cfs_kunmap(pga[i]->pg);
1191 static int check_write_rcs(struct ptlrpc_request *req,
1192 int requested_nob, int niocount,
1193 obd_count page_count, struct brw_page **pga)
1198 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1199 sizeof(*remote_rcs) *
1201 if (remote_rcs == NULL) {
1202 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1206 /* return error if any niobuf was in error */
1207 for (i = 0; i < niocount; i++) {
1208 if ((int)remote_rcs[i] < 0)
1209 return(remote_rcs[i]);
1211 if (remote_rcs[i] != 0) {
1212 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1213 i, remote_rcs[i], req);
1218 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1219 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1220 req->rq_bulk->bd_nob_transferred, requested_nob);
1227 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1229 if (p1->flag != p2->flag) {
1230 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1231 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1233 /* warn if we try to combine flags that we don't know to be
1234 * safe to combine */
1235 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1236 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1237 "report this at http://bugs.whamcloud.com/\n",
1238 p1->flag, p2->flag);
1243 return (p1->off + p1->count == p2->off);
1246 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1247 struct brw_page **pga, int opc,
1248 cksum_type_t cksum_type)
1253 LASSERT (pg_count > 0);
1254 cksum = init_checksum(cksum_type);
1255 while (nob > 0 && pg_count > 0) {
1256 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1257 int off = pga[i]->off & ~CFS_PAGE_MASK;
1258 int count = pga[i]->count > nob ? nob : pga[i]->count;
1260 /* corrupt the data before we compute the checksum, to
1261 * simulate an OST->client data error */
1262 if (i == 0 && opc == OST_READ &&
1263 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1264 memcpy(ptr + off, "bad1", min(4, nob));
1265 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1266 cfs_kunmap(pga[i]->pg);
1267 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1270 nob -= pga[i]->count;
1274 /* For sending we only compute the wrong checksum instead
1275 * of corrupting the data so it is still correct on a redo */
1276 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1279 return fini_checksum(cksum, cksum_type);
1282 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1283 struct lov_stripe_md *lsm, obd_count page_count,
1284 struct brw_page **pga,
1285 struct ptlrpc_request **reqp,
1286 struct obd_capa *ocapa, int reserve,
1289 struct ptlrpc_request *req;
1290 struct ptlrpc_bulk_desc *desc;
1291 struct ost_body *body;
1292 struct obd_ioobj *ioobj;
1293 struct niobuf_remote *niobuf;
1294 int niocount, i, requested_nob, opc, rc;
1295 struct osc_brw_async_args *aa;
1296 struct req_capsule *pill;
1297 struct brw_page *pg_prev;
1300 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1301 RETURN(-ENOMEM); /* Recoverable */
1302 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1303 RETURN(-EINVAL); /* Fatal */
1305 if ((cmd & OBD_BRW_WRITE) != 0) {
1307 req = ptlrpc_request_alloc_pool(cli->cl_import,
1308 cli->cl_import->imp_rq_pool,
1309 &RQF_OST_BRW_WRITE);
1312 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1317 for (niocount = i = 1; i < page_count; i++) {
1318 if (!can_merge_pages(pga[i - 1], pga[i]))
1322 pill = &req->rq_pill;
1323 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1325 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1326 niocount * sizeof(*niobuf));
1327 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1329 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1331 ptlrpc_request_free(req);
1334 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1335 ptlrpc_at_set_req_timeout(req);
1337 if (opc == OST_WRITE)
1338 desc = ptlrpc_prep_bulk_imp(req, page_count,
1339 BULK_GET_SOURCE, OST_BULK_PORTAL);
1341 desc = ptlrpc_prep_bulk_imp(req, page_count,
1342 BULK_PUT_SINK, OST_BULK_PORTAL);
1345 GOTO(out, rc = -ENOMEM);
1346 /* NB request now owns desc and will free it when it gets freed */
1348 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1349 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1350 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1351 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1353 lustre_set_wire_obdo(&body->oa, oa);
1355 obdo_to_ioobj(oa, ioobj);
1356 ioobj->ioo_bufcnt = niocount;
1357 osc_pack_capa(req, body, ocapa);
1358 LASSERT (page_count > 0);
1360 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1361 struct brw_page *pg = pga[i];
1362 int poff = pg->off & ~CFS_PAGE_MASK;
1364 LASSERT(pg->count > 0);
1365 /* make sure there is no gap in the middle of page array */
1366 LASSERTF(page_count == 1 ||
1367 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1368 ergo(i > 0 && i < page_count - 1,
1369 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1370 ergo(i == page_count - 1, poff == 0)),
1371 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1372 i, page_count, pg, pg->off, pg->count);
1374 LASSERTF(i == 0 || pg->off > pg_prev->off,
1375 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1376 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1378 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1379 pg_prev->pg, page_private(pg_prev->pg),
1380 pg_prev->pg->index, pg_prev->off);
1382 LASSERTF(i == 0 || pg->off > pg_prev->off,
1383 "i %d p_c %u\n", i, page_count);
1385 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1386 (pg->flag & OBD_BRW_SRVLOCK));
1388 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1389 requested_nob += pg->count;
1391 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1393 niobuf->len += pg->count;
1395 niobuf->offset = pg->off;
1396 niobuf->len = pg->count;
1397 niobuf->flags = pg->flag;
1402 LASSERTF((void *)(niobuf - niocount) ==
1403 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1404 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1405 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1407 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1409 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1410 body->oa.o_valid |= OBD_MD_FLFLAGS;
1411 body->oa.o_flags = 0;
1413 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1416 if (osc_should_shrink_grant(cli))
1417 osc_shrink_grant_local(cli, &body->oa);
1419 /* size[REQ_REC_OFF] still sizeof (*body) */
1420 if (opc == OST_WRITE) {
1421 if (cli->cl_checksum &&
1422 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1423 /* store cl_cksum_type in a local variable since
1424 * it can be changed via lprocfs */
1425 cksum_type_t cksum_type = cli->cl_cksum_type;
1427 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1428 oa->o_flags &= OBD_FL_LOCAL_MASK;
1429 body->oa.o_flags = 0;
1431 body->oa.o_flags |= cksum_type_pack(cksum_type);
1432 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1433 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1437 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1439 /* save this in 'oa', too, for later checking */
1440 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1441 oa->o_flags |= cksum_type_pack(cksum_type);
1443 /* clear out the checksum flag, in case this is a
1444 * resend but cl_checksum is no longer set. b=11238 */
1445 oa->o_valid &= ~OBD_MD_FLCKSUM;
1447 oa->o_cksum = body->oa.o_cksum;
1448 /* 1 RC per niobuf */
1449 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1450 sizeof(__u32) * niocount);
1452 if (cli->cl_checksum &&
1453 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1454 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1455 body->oa.o_flags = 0;
1456 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1457 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1460 ptlrpc_request_set_replen(req);
1462 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1463 aa = ptlrpc_req_async_args(req);
1465 aa->aa_requested_nob = requested_nob;
1466 aa->aa_nio_count = niocount;
1467 aa->aa_page_count = page_count;
1471 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1472 if (ocapa && reserve)
1473 aa->aa_ocapa = capa_get(ocapa);
1479 ptlrpc_req_finished(req);
1483 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1484 __u32 client_cksum, __u32 server_cksum, int nob,
1485 obd_count page_count, struct brw_page **pga,
1486 cksum_type_t client_cksum_type)
1490 cksum_type_t cksum_type;
1492 if (server_cksum == client_cksum) {
1493 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1497 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1499 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1502 if (cksum_type != client_cksum_type)
1503 msg = "the server did not use the checksum type specified in "
1504 "the original request - likely a protocol problem";
1505 else if (new_cksum == server_cksum)
1506 msg = "changed on the client after we checksummed it - "
1507 "likely false positive due to mmap IO (bug 11742)";
1508 else if (new_cksum == client_cksum)
1509 msg = "changed in transit before arrival at OST";
1511 msg = "changed in transit AND doesn't match the original - "
1512 "likely false positive due to mmap IO (bug 11742)";
1514 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1515 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1516 msg, libcfs_nid2str(peer->nid),
1517 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1518 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1519 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1521 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1523 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1524 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1525 "client csum now %x\n", client_cksum, client_cksum_type,
1526 server_cksum, cksum_type, new_cksum);
1530 /* Note rc enters this function as number of bytes transferred */
1531 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1533 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1534 const lnet_process_id_t *peer =
1535 &req->rq_import->imp_connection->c_peer;
1536 struct client_obd *cli = aa->aa_cli;
1537 struct ost_body *body;
1538 __u32 client_cksum = 0;
1541 if (rc < 0 && rc != -EDQUOT) {
1542 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1546 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1547 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1549 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1553 /* set/clear over quota flag for a uid/gid */
1554 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1555 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1556 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1558 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1559 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1561 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1564 osc_update_grant(cli, body);
1569 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1570 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1572 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1574 CERROR("Unexpected +ve rc %d\n", rc);
1577 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1579 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1582 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1583 check_write_checksum(&body->oa, peer, client_cksum,
1584 body->oa.o_cksum, aa->aa_requested_nob,
1585 aa->aa_page_count, aa->aa_ppga,
1586 cksum_type_unpack(aa->aa_oa->o_flags)))
1589 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1590 aa->aa_page_count, aa->aa_ppga);
1594 /* The rest of this function executes only for OST_READs */
1596 /* if unwrap_bulk failed, return -EAGAIN to retry */
1597 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1599 GOTO(out, rc = -EAGAIN);
1601 if (rc > aa->aa_requested_nob) {
1602 CERROR("Unexpected rc %d (%d requested)\n", rc,
1603 aa->aa_requested_nob);
1607 if (rc != req->rq_bulk->bd_nob_transferred) {
1608 CERROR ("Unexpected rc %d (%d transferred)\n",
1609 rc, req->rq_bulk->bd_nob_transferred);
1613 if (rc < aa->aa_requested_nob)
1614 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1616 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1617 static int cksum_counter;
1618 __u32 server_cksum = body->oa.o_cksum;
1621 cksum_type_t cksum_type;
1623 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1624 body->oa.o_flags : 0);
1625 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1626 aa->aa_ppga, OST_READ,
1629 if (peer->nid == req->rq_bulk->bd_sender) {
1633 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1636 if (server_cksum == ~0 && rc > 0) {
1637 CERROR("Protocol error: server %s set the 'checksum' "
1638 "bit, but didn't send a checksum. Not fatal, "
1639 "but please notify on http://bugs.whamcloud.com/\n",
1640 libcfs_nid2str(peer->nid));
1641 } else if (server_cksum != client_cksum) {
1642 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1643 "%s%s%s inode "DFID" object "
1644 LPU64"/"LPU64" extent "
1645 "["LPU64"-"LPU64"]\n",
1646 req->rq_import->imp_obd->obd_name,
1647 libcfs_nid2str(peer->nid),
1649 body->oa.o_valid & OBD_MD_FLFID ?
1650 body->oa.o_parent_seq : (__u64)0,
1651 body->oa.o_valid & OBD_MD_FLFID ?
1652 body->oa.o_parent_oid : 0,
1653 body->oa.o_valid & OBD_MD_FLFID ?
1654 body->oa.o_parent_ver : 0,
1656 body->oa.o_valid & OBD_MD_FLGROUP ?
1657 body->oa.o_seq : (__u64)0,
1658 aa->aa_ppga[0]->off,
1659 aa->aa_ppga[aa->aa_page_count-1]->off +
1660 aa->aa_ppga[aa->aa_page_count-1]->count -
1662 CERROR("client %x, server %x, cksum_type %x\n",
1663 client_cksum, server_cksum, cksum_type);
1665 aa->aa_oa->o_cksum = client_cksum;
1669 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1672 } else if (unlikely(client_cksum)) {
1673 static int cksum_missed;
1676 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1677 CERROR("Checksum %u requested from %s but not sent\n",
1678 cksum_missed, libcfs_nid2str(peer->nid));
1684 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1689 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1690 struct lov_stripe_md *lsm,
1691 obd_count page_count, struct brw_page **pga,
1692 struct obd_capa *ocapa)
1694 struct ptlrpc_request *req;
1697 int generation, resends = 0;
1698 struct l_wait_info lwi;
1702 cfs_waitq_init(&waitq);
1703 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1706 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1707 page_count, pga, &req, ocapa, 0, resends);
1712 req->rq_generation_set = 1;
1713 req->rq_import_generation = generation;
1714 req->rq_sent = cfs_time_current_sec() + resends;
1717 rc = ptlrpc_queue_wait(req);
1719 if (rc == -ETIMEDOUT && req->rq_resend) {
1720 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1721 ptlrpc_req_finished(req);
1725 rc = osc_brw_fini_request(req, rc);
1727 ptlrpc_req_finished(req);
1728 /* When server return -EINPROGRESS, client should always retry
1729 * regardless of the number of times the bulk was resent already.*/
1730 if (osc_recoverable_error(rc)) {
1732 if (rc != -EINPROGRESS &&
1733 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1734 CERROR("%s: too many resend retries for object: "
1735 ""LPU64":"LPU64", rc = %d.\n",
1736 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1740 exp->exp_obd->u.cli.cl_import->imp_generation) {
1741 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1742 ""LPU64":"LPU64", rc = %d.\n",
1743 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1747 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1749 l_wait_event(waitq, 0, &lwi);
1754 if (rc == -EAGAIN || rc == -EINPROGRESS)
1759 int osc_brw_redo_request(struct ptlrpc_request *request,
1760 struct osc_brw_async_args *aa)
1762 struct ptlrpc_request *new_req;
1763 struct ptlrpc_request_set *set = request->rq_set;
1764 struct osc_brw_async_args *new_aa;
1765 struct osc_async_page *oap;
1769 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1771 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1772 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1773 aa->aa_cli, aa->aa_oa,
1774 NULL /* lsm unused by osc currently */,
1775 aa->aa_page_count, aa->aa_ppga,
1776 &new_req, aa->aa_ocapa, 0, 1);
1780 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1782 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1783 if (oap->oap_request != NULL) {
1784 LASSERTF(request == oap->oap_request,
1785 "request %p != oap_request %p\n",
1786 request, oap->oap_request);
1787 if (oap->oap_interrupted) {
1788 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1789 ptlrpc_req_finished(new_req);
1794 /* New request takes over pga and oaps from old request.
1795 * Note that copying a list_head doesn't work, need to move it... */
1797 new_req->rq_interpret_reply = request->rq_interpret_reply;
1798 new_req->rq_async_args = request->rq_async_args;
1799 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1800 new_req->rq_generation_set = 1;
1801 new_req->rq_import_generation = request->rq_import_generation;
1803 new_aa = ptlrpc_req_async_args(new_req);
1805 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1806 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1807 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1809 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1810 if (oap->oap_request) {
1811 ptlrpc_req_finished(oap->oap_request);
1812 oap->oap_request = ptlrpc_request_addref(new_req);
1816 new_aa->aa_ocapa = aa->aa_ocapa;
1817 aa->aa_ocapa = NULL;
1819 /* use ptlrpc_set_add_req is safe because interpret functions work
1820 * in check_set context. only one way exist with access to request
1821 * from different thread got -EINTR - this way protected with
1822 * cl_loi_list_lock */
1823 ptlrpc_set_add_req(set, new_req);
1825 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1827 DEBUG_REQ(D_INFO, new_req, "new request");
1832 * ugh, we want disk allocation on the target to happen in offset order. we'll
1833 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1834 * fine for our small page arrays and doesn't require allocation. its an
1835 * insertion sort that swaps elements that are strides apart, shrinking the
1836 * stride down until its '1' and the array is sorted.
1838 static void sort_brw_pages(struct brw_page **array, int num)
1841 struct brw_page *tmp;
1845 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1850 for (i = stride ; i < num ; i++) {
1853 while (j >= stride && array[j - stride]->off > tmp->off) {
1854 array[j] = array[j - stride];
1859 } while (stride > 1);
1862 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1868 LASSERT (pages > 0);
1869 offset = pg[i]->off & ~CFS_PAGE_MASK;
1873 if (pages == 0) /* that's all */
1876 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1877 return count; /* doesn't end on page boundary */
1880 offset = pg[i]->off & ~CFS_PAGE_MASK;
1881 if (offset != 0) /* doesn't start on page boundary */
1888 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1890 struct brw_page **ppga;
1893 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1897 for (i = 0; i < count; i++)
1902 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1904 LASSERT(ppga != NULL);
1905 OBD_FREE(ppga, sizeof(*ppga) * count);
1908 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1909 obd_count page_count, struct brw_page *pga,
1910 struct obd_trans_info *oti)
1912 struct obdo *saved_oa = NULL;
1913 struct brw_page **ppga, **orig;
1914 struct obd_import *imp = class_exp2cliimp(exp);
1915 struct client_obd *cli;
1916 int rc, page_count_orig;
1919 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1920 cli = &imp->imp_obd->u.cli;
1922 if (cmd & OBD_BRW_CHECK) {
1923 /* The caller just wants to know if there's a chance that this
1924 * I/O can succeed */
1926 if (imp->imp_invalid)
1931 /* test_brw with a failed create can trip this, maybe others. */
1932 LASSERT(cli->cl_max_pages_per_rpc);
1936 orig = ppga = osc_build_ppga(pga, page_count);
1939 page_count_orig = page_count;
1941 sort_brw_pages(ppga, page_count);
1942 while (page_count) {
1943 obd_count pages_per_brw;
1945 if (page_count > cli->cl_max_pages_per_rpc)
1946 pages_per_brw = cli->cl_max_pages_per_rpc;
1948 pages_per_brw = page_count;
1950 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1952 if (saved_oa != NULL) {
1953 /* restore previously saved oa */
1954 *oinfo->oi_oa = *saved_oa;
1955 } else if (page_count > pages_per_brw) {
1956 /* save a copy of oa (brw will clobber it) */
1957 OBDO_ALLOC(saved_oa);
1958 if (saved_oa == NULL)
1959 GOTO(out, rc = -ENOMEM);
1960 *saved_oa = *oinfo->oi_oa;
1963 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1964 pages_per_brw, ppga, oinfo->oi_capa);
1969 page_count -= pages_per_brw;
1970 ppga += pages_per_brw;
1974 osc_release_ppga(orig, page_count_orig);
1976 if (saved_oa != NULL)
1977 OBDO_FREE(saved_oa);
1982 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1983 * the dirty accounting. Writeback completes or truncate happens before
1984 * writing starts. Must be called with the loi lock held. */
1985 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1988 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1992 /* This maintains the lists of pending pages to read/write for a given object
1993 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1994 * to quickly find objects that are ready to send an RPC. */
1995 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
2000 if (lop->lop_num_pending == 0)
2003 /* if we have an invalid import we want to drain the queued pages
2004 * by forcing them through rpcs that immediately fail and complete
2005 * the pages. recovery relies on this to empty the queued pages
2006 * before canceling the locks and evicting down the llite pages */
2007 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2010 /* stream rpcs in queue order as long as as there is an urgent page
2011 * queued. this is our cheap solution for good batching in the case
2012 * where writepage marks some random page in the middle of the file
2013 * as urgent because of, say, memory pressure */
2014 if (!cfs_list_empty(&lop->lop_urgent)) {
2015 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2019 if (cmd & OBD_BRW_WRITE) {
2020 /* trigger a write rpc stream as long as there are dirtiers
2021 * waiting for space. as they're waiting, they're not going to
2022 * create more pages to coalesce with what's waiting.. */
2023 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2024 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2028 if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2034 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2036 struct osc_async_page *oap;
2039 if (cfs_list_empty(&lop->lop_urgent))
2042 oap = cfs_list_entry(lop->lop_urgent.next,
2043 struct osc_async_page, oap_urgent_item);
2045 if (oap->oap_async_flags & ASYNC_HP) {
2046 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2053 static void on_list(cfs_list_t *item, cfs_list_t *list,
2056 if (cfs_list_empty(item) && should_be_on)
2057 cfs_list_add_tail(item, list);
2058 else if (!cfs_list_empty(item) && !should_be_on)
2059 cfs_list_del_init(item);
2062 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2063 * can find pages to build into rpcs quickly */
2064 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2066 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2067 lop_makes_hprpc(&loi->loi_read_lop)) {
2069 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2070 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2072 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2073 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2074 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2075 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2078 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2079 loi->loi_write_lop.lop_num_pending);
2081 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2082 loi->loi_read_lop.lop_num_pending);
2085 static void lop_update_pending(struct client_obd *cli,
2086 struct loi_oap_pages *lop, int cmd, int delta)
2088 lop->lop_num_pending += delta;
2089 if (cmd & OBD_BRW_WRITE)
2090 cli->cl_pending_w_pages += delta;
2092 cli->cl_pending_r_pages += delta;
2096 * this is called when a sync waiter receives an interruption. Its job is to
2097 * get the caller woken as soon as possible. If its page hasn't been put in an
2098 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2099 * desiring interruption which will forcefully complete the rpc once the rpc
2102 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2104 struct loi_oap_pages *lop;
2105 struct lov_oinfo *loi;
2109 LASSERT(!oap->oap_interrupted);
2110 oap->oap_interrupted = 1;
2112 /* ok, it's been put in an rpc. only one oap gets a request reference */
2113 if (oap->oap_request != NULL) {
2114 ptlrpc_mark_interrupted(oap->oap_request);
2115 ptlrpcd_wake(oap->oap_request);
2116 ptlrpc_req_finished(oap->oap_request);
2117 oap->oap_request = NULL;
2121 * page completion may be called only if ->cpo_prep() method was
2122 * executed by osc_io_submit(), that also adds page the to pending list
2124 if (!cfs_list_empty(&oap->oap_pending_item)) {
2125 cfs_list_del_init(&oap->oap_pending_item);
2126 cfs_list_del_init(&oap->oap_urgent_item);
2129 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2130 &loi->loi_write_lop : &loi->loi_read_lop;
2131 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2132 loi_list_maint(oap->oap_cli, oap->oap_loi);
2133 rc = oap->oap_caller_ops->ap_completion(env,
2134 oap->oap_caller_data,
2135 oap->oap_cmd, NULL, -EINTR);
2141 /* this is trying to propogate async writeback errors back up to the
2142 * application. As an async write fails we record the error code for later if
2143 * the app does an fsync. As long as errors persist we force future rpcs to be
2144 * sync so that the app can get a sync error and break the cycle of queueing
2145 * pages for which writeback will fail. */
2146 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2153 ar->ar_force_sync = 1;
2154 ar->ar_min_xid = ptlrpc_sample_next_xid();
2159 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2160 ar->ar_force_sync = 0;
2163 void osc_oap_to_pending(struct osc_async_page *oap)
2165 struct loi_oap_pages *lop;
2167 if (oap->oap_cmd & OBD_BRW_WRITE)
2168 lop = &oap->oap_loi->loi_write_lop;
2170 lop = &oap->oap_loi->loi_read_lop;
2172 if (oap->oap_async_flags & ASYNC_HP)
2173 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2174 else if (oap->oap_async_flags & ASYNC_URGENT)
2175 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2176 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2177 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2180 /* this must be called holding the loi list lock to give coverage to exit_cache,
2181 * async_flag maintenance, and oap_request */
2182 static void osc_ap_completion(const struct lu_env *env,
2183 struct client_obd *cli, struct obdo *oa,
2184 struct osc_async_page *oap, int sent, int rc)
2189 if (oap->oap_request != NULL) {
2190 xid = ptlrpc_req_xid(oap->oap_request);
2191 ptlrpc_req_finished(oap->oap_request);
2192 oap->oap_request = NULL;
2195 cfs_spin_lock(&oap->oap_lock);
2196 oap->oap_async_flags = 0;
2197 cfs_spin_unlock(&oap->oap_lock);
2198 oap->oap_interrupted = 0;
2200 if (oap->oap_cmd & OBD_BRW_WRITE) {
2201 osc_process_ar(&cli->cl_ar, xid, rc);
2202 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2205 if (rc == 0 && oa != NULL) {
2206 if (oa->o_valid & OBD_MD_FLBLOCKS)
2207 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2208 if (oa->o_valid & OBD_MD_FLMTIME)
2209 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2210 if (oa->o_valid & OBD_MD_FLATIME)
2211 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2212 if (oa->o_valid & OBD_MD_FLCTIME)
2213 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2216 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2217 oap->oap_cmd, oa, rc);
2219 /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2220 * start, but OSC calls it under lock and thus we can add oap back to
2223 /* upper layer wants to leave the page on pending queue */
2224 osc_oap_to_pending(oap);
2226 osc_exit_cache(cli, oap, sent);
2230 static int brw_queue_work(const struct lu_env *env, void *data)
2232 struct client_obd *cli = data;
2234 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2236 client_obd_list_lock(&cli->cl_loi_list_lock);
2237 osc_check_rpcs0(env, cli, 1);
2238 client_obd_list_unlock(&cli->cl_loi_list_lock);
2242 static int brw_interpret(const struct lu_env *env,
2243 struct ptlrpc_request *req, void *data, int rc)
2245 struct osc_brw_async_args *aa = data;
2246 struct client_obd *cli;
2250 rc = osc_brw_fini_request(req, rc);
2251 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2252 /* When server return -EINPROGRESS, client should always retry
2253 * regardless of the number of times the bulk was resent already. */
2254 if (osc_recoverable_error(rc)) {
2255 if (req->rq_import_generation !=
2256 req->rq_import->imp_generation) {
2257 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2258 ""LPU64":"LPU64", rc = %d.\n",
2259 req->rq_import->imp_obd->obd_name,
2260 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2261 } else if (rc == -EINPROGRESS ||
2262 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2263 rc = osc_brw_redo_request(req, aa);
2265 CERROR("%s: too many resent retries for object: "
2266 ""LPU64":"LPU64", rc = %d.\n",
2267 req->rq_import->imp_obd->obd_name,
2268 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2273 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2278 capa_put(aa->aa_ocapa);
2279 aa->aa_ocapa = NULL;
2283 client_obd_list_lock(&cli->cl_loi_list_lock);
2285 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2286 * is called so we know whether to go to sync BRWs or wait for more
2287 * RPCs to complete */
2288 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2289 cli->cl_w_in_flight--;
2291 cli->cl_r_in_flight--;
2293 async = cfs_list_empty(&aa->aa_oaps);
2294 if (!async) { /* from osc_send_oap_rpc() */
2295 struct osc_async_page *oap, *tmp;
2296 /* the caller may re-use the oap after the completion call so
2297 * we need to clean it up a little */
2298 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2300 cfs_list_del_init(&oap->oap_rpc_item);
2301 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2303 OBDO_FREE(aa->aa_oa);
2304 } else { /* from async_internal() */
2306 for (i = 0; i < aa->aa_page_count; i++)
2307 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2309 osc_wake_cache_waiters(cli);
2310 osc_check_rpcs0(env, cli, 1);
2311 client_obd_list_unlock(&cli->cl_loi_list_lock);
2314 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2315 req->rq_bulk->bd_nob_transferred);
2316 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2317 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2322 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2323 struct client_obd *cli,
2324 cfs_list_t *rpc_list,
2325 int page_count, int cmd)
2327 struct ptlrpc_request *req;
2328 struct brw_page **pga = NULL;
2329 struct osc_brw_async_args *aa;
2330 struct obdo *oa = NULL;
2331 const struct obd_async_page_ops *ops = NULL;
2332 struct osc_async_page *oap;
2333 struct osc_async_page *tmp;
2334 struct cl_req *clerq = NULL;
2335 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2336 struct ldlm_lock *lock = NULL;
2337 struct cl_req_attr crattr;
2338 int i, rc, mpflag = 0;
2341 LASSERT(!cfs_list_empty(rpc_list));
2343 if (cmd & OBD_BRW_MEMALLOC)
2344 mpflag = cfs_memory_pressure_get_and_set();
2346 memset(&crattr, 0, sizeof crattr);
2347 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2349 GOTO(out, req = ERR_PTR(-ENOMEM));
2353 GOTO(out, req = ERR_PTR(-ENOMEM));
2356 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2357 struct cl_page *page = osc_oap2cl_page(oap);
2359 ops = oap->oap_caller_ops;
2361 clerq = cl_req_alloc(env, page, crt,
2362 1 /* only 1-object rpcs for
2365 GOTO(out, req = (void *)clerq);
2366 lock = oap->oap_ldlm_lock;
2368 pga[i] = &oap->oap_brw_page;
2369 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2370 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2371 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2373 cl_req_page_add(env, clerq, page);
2376 /* always get the data for the obdo for the rpc */
2377 LASSERT(ops != NULL);
2379 crattr.cra_capa = NULL;
2380 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2382 oa->o_handle = lock->l_remote_handle;
2383 oa->o_valid |= OBD_MD_FLHANDLE;
2386 rc = cl_req_prep(env, clerq);
2388 CERROR("cl_req_prep failed: %d\n", rc);
2389 GOTO(out, req = ERR_PTR(rc));
2392 sort_brw_pages(pga, page_count);
2393 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2394 pga, &req, crattr.cra_capa, 1, 0);
2396 CERROR("prep_req failed: %d\n", rc);
2397 GOTO(out, req = ERR_PTR(rc));
2400 if (cmd & OBD_BRW_MEMALLOC)
2401 req->rq_memalloc = 1;
2403 /* Need to update the timestamps after the request is built in case
2404 * we race with setattr (locally or in queue at OST). If OST gets
2405 * later setattr before earlier BRW (as determined by the request xid),
2406 * the OST will not use BRW timestamps. Sadly, there is no obvious
2407 * way to do this in a single call. bug 10150 */
2408 cl_req_attr_set(env, clerq, &crattr,
2409 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2411 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2412 aa = ptlrpc_req_async_args(req);
2413 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2414 cfs_list_splice(rpc_list, &aa->aa_oaps);
2415 CFS_INIT_LIST_HEAD(rpc_list);
2416 aa->aa_clerq = clerq;
2418 if (cmd & OBD_BRW_MEMALLOC)
2419 cfs_memory_pressure_restore(mpflag);
2421 capa_put(crattr.cra_capa);
2426 OBD_FREE(pga, sizeof(*pga) * page_count);
2427 /* this should happen rarely and is pretty bad, it makes the
2428 * pending list not follow the dirty order */
2429 client_obd_list_lock(&cli->cl_loi_list_lock);
2430 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2431 cfs_list_del_init(&oap->oap_rpc_item);
2433 /* queued sync pages can be torn down while the pages
2434 * were between the pending list and the rpc */
2435 if (oap->oap_interrupted) {
2436 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2437 osc_ap_completion(env, cli, NULL, oap, 0,
2441 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2443 if (clerq && !IS_ERR(clerq))
2444 cl_req_completion(env, clerq, PTR_ERR(req));
2450 * prepare pages for ASYNC io and put pages in send queue.
2452 * \param cmd OBD_BRW_* macroses
2453 * \param lop pending pages
2455 * \return zero if no page added to send queue.
2456 * \return 1 if pages successfully added to send queue.
2457 * \return negative on errors.
2460 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2461 struct lov_oinfo *loi, int cmd,
2462 struct loi_oap_pages *lop, pdl_policy_t pol)
2464 struct ptlrpc_request *req;
2465 obd_count page_count = 0;
2466 struct osc_async_page *oap = NULL, *tmp;
2467 struct osc_brw_async_args *aa;
2468 const struct obd_async_page_ops *ops;
2469 CFS_LIST_HEAD(rpc_list);
2470 int srvlock = 0, mem_tight = 0;
2471 struct cl_object *clob = NULL;
2472 obd_off starting_offset = OBD_OBJECT_EOF;
2473 unsigned int ending_offset;
2474 int starting_page_off = 0;
2477 /* ASYNC_HP pages first. At present, when the lock the pages is
2478 * to be canceled, the pages covered by the lock will be sent out
2479 * with ASYNC_HP. We have to send out them as soon as possible. */
2480 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2481 if (oap->oap_async_flags & ASYNC_HP)
2482 cfs_list_move(&oap->oap_pending_item, &rpc_list);
2483 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2484 /* only do this for writeback pages. */
2485 cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2486 if (++page_count >= cli->cl_max_pages_per_rpc)
2489 cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2492 /* first we find the pages we're allowed to work with */
2493 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2495 ops = oap->oap_caller_ops;
2497 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2498 "magic 0x%x\n", oap, oap->oap_magic);
2501 /* pin object in memory, so that completion call-backs
2502 * can be safely called under client_obd_list lock. */
2503 clob = osc_oap2cl_page(oap)->cp_obj;
2504 cl_object_get(clob);
2507 if (page_count != 0 &&
2508 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2509 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2510 " oap %p, page %p, srvlock %u\n",
2511 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2515 /* If there is a gap at the start of this page, it can't merge
2516 * with any previous page, so we'll hand the network a
2517 * "fragmented" page array that it can't transfer in 1 RDMA */
2518 if (oap->oap_obj_off < starting_offset) {
2519 if (starting_page_off != 0)
2522 starting_page_off = oap->oap_page_off;
2523 starting_offset = oap->oap_obj_off + starting_page_off;
2524 } else if (oap->oap_page_off != 0)
2527 /* in llite being 'ready' equates to the page being locked
2528 * until completion unlocks it. commit_write submits a page
2529 * as not ready because its unlock will happen unconditionally
2530 * as the call returns. if we race with commit_write giving
2531 * us that page we don't want to create a hole in the page
2532 * stream, so we stop and leave the rpc to be fired by
2533 * another dirtier or kupdated interval (the not ready page
2534 * will still be on the dirty list). we could call in
2535 * at the end of ll_file_write to process the queue again. */
2536 if (!(oap->oap_async_flags & ASYNC_READY)) {
2537 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2540 CDEBUG(D_INODE, "oap %p page %p returned %d "
2541 "instead of ready\n", oap,
2545 /* llite is telling us that the page is still
2546 * in commit_write and that we should try
2547 * and put it in an rpc again later. we
2548 * break out of the loop so we don't create
2549 * a hole in the sequence of pages in the rpc
2554 /* the io isn't needed.. tell the checks
2555 * below to complete the rpc with EINTR */
2556 cfs_spin_lock(&oap->oap_lock);
2557 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2558 cfs_spin_unlock(&oap->oap_lock);
2559 oap->oap_count = -EINTR;
2562 cfs_spin_lock(&oap->oap_lock);
2563 oap->oap_async_flags |= ASYNC_READY;
2564 cfs_spin_unlock(&oap->oap_lock);
2567 LASSERTF(0, "oap %p page %p returned %d "
2568 "from make_ready\n", oap,
2576 /* take the page out of our book-keeping */
2577 cfs_list_del_init(&oap->oap_pending_item);
2578 lop_update_pending(cli, lop, cmd, -1);
2579 cfs_list_del_init(&oap->oap_urgent_item);
2581 /* ask the caller for the size of the io as the rpc leaves. */
2582 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2584 ops->ap_refresh_count(env, oap->oap_caller_data,
2586 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2588 if (oap->oap_count <= 0) {
2589 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2591 osc_ap_completion(env, cli, NULL,
2592 oap, 0, oap->oap_count);
2596 /* now put the page back in our accounting */
2597 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2598 if (page_count++ == 0)
2599 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2601 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2604 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2605 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2606 * have the same alignment as the initial writes that allocated
2607 * extents on the server. */
2608 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2610 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2613 if (page_count >= cli->cl_max_pages_per_rpc)
2616 /* If there is a gap at the end of this page, it can't merge
2617 * with any subsequent pages, so we'll hand the network a
2618 * "fragmented" page array that it can't transfer in 1 RDMA */
2619 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2623 loi_list_maint(cli, loi);
2625 client_obd_list_unlock(&cli->cl_loi_list_lock);
2628 cl_object_put(env, clob);
2630 if (page_count == 0) {
2631 client_obd_list_lock(&cli->cl_loi_list_lock);
2635 req = osc_build_req(env, cli, &rpc_list, page_count,
2636 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2638 LASSERT(cfs_list_empty(&rpc_list));
2639 loi_list_maint(cli, loi);
2640 RETURN(PTR_ERR(req));
2643 aa = ptlrpc_req_async_args(req);
2645 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2646 if (cmd == OBD_BRW_READ) {
2647 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2648 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2649 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2650 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2652 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2653 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2654 cli->cl_w_in_flight);
2655 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2656 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2659 client_obd_list_lock(&cli->cl_loi_list_lock);
2661 if (cmd == OBD_BRW_READ)
2662 cli->cl_r_in_flight++;
2664 cli->cl_w_in_flight++;
2666 /* queued sync pages can be torn down while the pages
2667 * were between the pending list and the rpc */
2669 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2670 /* only one oap gets a request reference */
2673 if (oap->oap_interrupted && !req->rq_intr) {
2674 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2676 ptlrpc_mark_interrupted(req);
2680 tmp->oap_request = ptlrpc_request_addref(req);
2682 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2683 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2685 req->rq_interpret_reply = brw_interpret;
2687 /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2688 * CPU/NUMA node the majority of pages were allocated on, and try
2689 * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2690 * to reduce cross-CPU memory traffic.
2692 * But on the other hand, we expect that multiple ptlrpcd threads
2693 * and the initial write sponsor can run in parallel, especially
2694 * when data checksum is enabled, which is CPU-bound operation and
2695 * single ptlrpcd thread cannot process in time. So more ptlrpcd
2696 * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2698 ptlrpcd_add_req(req, pol, -1);
2702 #define LOI_DEBUG(LOI, STR, args...) \
2703 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2704 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2705 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2706 (LOI)->loi_write_lop.lop_num_pending, \
2707 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2708 (LOI)->loi_read_lop.lop_num_pending, \
2709 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2712 /* This is called by osc_check_rpcs() to find which objects have pages that
2713 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2714 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2718 /* First return objects that have blocked locks so that they
2719 * will be flushed quickly and other clients can get the lock,
2720 * then objects which have pages ready to be stuffed into RPCs */
2721 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2722 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2723 struct lov_oinfo, loi_hp_ready_item));
2724 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2725 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2726 struct lov_oinfo, loi_ready_item));
2728 /* then if we have cache waiters, return all objects with queued
2729 * writes. This is especially important when many small files
2730 * have filled up the cache and not been fired into rpcs because
2731 * they don't pass the nr_pending/object threshhold */
2732 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2733 !cfs_list_empty(&cli->cl_loi_write_list))
2734 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2735 struct lov_oinfo, loi_write_item));
2737 /* then return all queued objects when we have an invalid import
2738 * so that they get flushed */
2739 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2740 if (!cfs_list_empty(&cli->cl_loi_write_list))
2741 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2744 if (!cfs_list_empty(&cli->cl_loi_read_list))
2745 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2746 struct lov_oinfo, loi_read_item));
2751 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2753 struct osc_async_page *oap;
2756 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2757 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2758 struct osc_async_page, oap_urgent_item);
2759 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2762 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2763 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2764 struct osc_async_page, oap_urgent_item);
2765 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2768 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2771 /* called with the loi list lock held */
2772 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2774 struct lov_oinfo *loi;
2775 int rc = 0, race_counter = 0;
2779 pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2781 while ((loi = osc_next_loi(cli)) != NULL) {
2782 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2784 if (osc_max_rpc_in_flight(cli, loi))
2787 /* attempt some read/write balancing by alternating between
2788 * reads and writes in an object. The makes_rpc checks here
2789 * would be redundant if we were getting read/write work items
2790 * instead of objects. we don't want send_oap_rpc to drain a
2791 * partial read pending queue when we're given this object to
2792 * do io on writes while there are cache waiters */
2793 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2794 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2795 &loi->loi_write_lop, pol);
2797 CERROR("Write request failed with %d\n", rc);
2799 /* osc_send_oap_rpc failed, mostly because of
2802 * It can't break here, because if:
2803 * - a page was submitted by osc_io_submit, so
2805 * - no request in flight
2806 * - no subsequent request
2807 * The system will be in live-lock state,
2808 * because there is no chance to call
2809 * osc_io_unplug() and osc_check_rpcs() any
2810 * more. pdflush can't help in this case,
2811 * because it might be blocked at grabbing
2812 * the page lock as we mentioned.
2814 * Anyway, continue to drain pages. */
2823 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2824 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2825 &loi->loi_read_lop, pol);
2827 CERROR("Read request failed with %d\n", rc);
2835 /* attempt some inter-object balancing by issuing rpcs
2836 * for each object in turn */
2837 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2838 cfs_list_del_init(&loi->loi_hp_ready_item);
2839 if (!cfs_list_empty(&loi->loi_ready_item))
2840 cfs_list_del_init(&loi->loi_ready_item);
2841 if (!cfs_list_empty(&loi->loi_write_item))
2842 cfs_list_del_init(&loi->loi_write_item);
2843 if (!cfs_list_empty(&loi->loi_read_item))
2844 cfs_list_del_init(&loi->loi_read_item);
2846 loi_list_maint(cli, loi);
2848 /* send_oap_rpc fails with 0 when make_ready tells it to
2849 * back off. llite's make_ready does this when it tries
2850 * to lock a page queued for write that is already locked.
2851 * we want to try sending rpcs from many objects, but we
2852 * don't want to spin failing with 0. */
2853 if (race_counter == 10)
2858 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2860 osc_check_rpcs0(env, cli, 0);
2864 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2867 int osc_enter_cache_try(const struct lu_env *env,
2868 struct client_obd *cli, struct lov_oinfo *loi,
2869 struct osc_async_page *oap, int transient)
2873 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2875 osc_consume_write_grant(cli, &oap->oap_brw_page);
2877 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2878 cfs_atomic_inc(&obd_dirty_transit_pages);
2879 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2885 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2886 * grant or cache space. */
2887 static int osc_enter_cache(const struct lu_env *env,
2888 struct client_obd *cli, struct lov_oinfo *loi,
2889 struct osc_async_page *oap)
2891 struct osc_cache_waiter ocw;
2892 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2896 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2897 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2898 cli->cl_dirty_max, obd_max_dirty_pages,
2899 cli->cl_lost_grant, cli->cl_avail_grant);
2901 /* force the caller to try sync io. this can jump the list
2902 * of queued writes and create a discontiguous rpc stream */
2903 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2904 cli->cl_dirty_max < CFS_PAGE_SIZE ||
2905 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2908 /* Hopefully normal case - cache space and write credits available */
2909 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2910 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2911 osc_enter_cache_try(env, cli, loi, oap, 0))
2914 /* We can get here for two reasons: too many dirty pages in cache, or
2915 * run out of grants. In both cases we should write dirty pages out.
2916 * Adding a cache waiter will trigger urgent write-out no matter what
2918 * The exiting condition is no avail grants and no dirty pages caching,
2919 * that really means there is no space on the OST. */
2920 cfs_waitq_init(&ocw.ocw_waitq);
2922 while (cli->cl_dirty > 0) {
2923 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2926 loi_list_maint(cli, loi);
2927 osc_check_rpcs(env, cli);
2928 client_obd_list_unlock(&cli->cl_loi_list_lock);
2930 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
2931 cli->cl_import->imp_obd->obd_name, &ocw, oap);
2933 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
2935 client_obd_list_lock(&cli->cl_loi_list_lock);
2936 cfs_list_del_init(&ocw.ocw_entry);
2949 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2950 struct lov_oinfo *loi, cfs_page_t *page,
2951 obd_off offset, const struct obd_async_page_ops *ops,
2952 void *data, void **res, int nocache,
2953 struct lustre_handle *lockh)
2955 struct osc_async_page *oap;
2960 return cfs_size_round(sizeof(*oap));
2963 oap->oap_magic = OAP_MAGIC;
2964 oap->oap_cli = &exp->exp_obd->u.cli;
2967 oap->oap_caller_ops = ops;
2968 oap->oap_caller_data = data;
2970 oap->oap_page = page;
2971 oap->oap_obj_off = offset;
2972 if (!client_is_remote(exp) &&
2973 cfs_capable(CFS_CAP_SYS_RESOURCE))
2974 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2976 LASSERT(!(offset & ~CFS_PAGE_MASK));
2978 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2979 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2980 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2981 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2983 cfs_spin_lock_init(&oap->oap_lock);
2984 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2988 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2989 struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2990 struct osc_async_page *oap, int cmd, int off,
2991 int count, obd_flag brw_flags, enum async_flags async_flags)
2993 struct client_obd *cli = &exp->exp_obd->u.cli;
2997 if (oap->oap_magic != OAP_MAGIC)
3000 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3003 if (!cfs_list_empty(&oap->oap_pending_item) ||
3004 !cfs_list_empty(&oap->oap_urgent_item) ||
3005 !cfs_list_empty(&oap->oap_rpc_item))
3008 /* check if the file's owner/group is over quota */
3009 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3010 struct cl_object *obj;
3011 struct cl_attr attr; /* XXX put attr into thread info */
3012 unsigned int qid[MAXQUOTAS];
3014 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3016 cl_object_attr_lock(obj);
3017 rc = cl_object_attr_get(env, obj, &attr);
3018 cl_object_attr_unlock(obj);
3020 qid[USRQUOTA] = attr.cat_uid;
3021 qid[GRPQUOTA] = attr.cat_gid;
3023 osc_quota_chkdq(cli, qid) == NO_QUOTA)
3030 loi = lsm->lsm_oinfo[0];
3032 client_obd_list_lock(&cli->cl_loi_list_lock);
3034 LASSERT(off + count <= CFS_PAGE_SIZE);
3036 oap->oap_page_off = off;
3037 oap->oap_count = count;
3038 oap->oap_brw_flags = brw_flags;
3039 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3040 if (cfs_memory_pressure_get())
3041 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3042 cfs_spin_lock(&oap->oap_lock);
3043 oap->oap_async_flags = async_flags;
3044 cfs_spin_unlock(&oap->oap_lock);
3046 if (cmd & OBD_BRW_WRITE) {
3047 rc = osc_enter_cache(env, cli, loi, oap);
3049 client_obd_list_unlock(&cli->cl_loi_list_lock);
3054 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3057 osc_oap_to_pending(oap);
3058 loi_list_maint(cli, loi);
3059 if (!osc_max_rpc_in_flight(cli, loi) &&
3060 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3061 LASSERT(cli->cl_writeback_work != NULL);
3062 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3064 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3067 client_obd_list_unlock(&cli->cl_loi_list_lock);
3072 /* aka (~was & now & flag), but this is more clear :) */
3073 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3075 int osc_set_async_flags_base(struct client_obd *cli,
3076 struct lov_oinfo *loi, struct osc_async_page *oap,
3077 obd_flag async_flags)
3079 struct loi_oap_pages *lop;
3083 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3085 if (oap->oap_cmd & OBD_BRW_WRITE) {
3086 lop = &loi->loi_write_lop;
3088 lop = &loi->loi_read_lop;
3091 if ((oap->oap_async_flags & async_flags) == async_flags)
3094 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3095 flags |= ASYNC_READY;
3097 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3098 cfs_list_empty(&oap->oap_rpc_item)) {
3099 if (oap->oap_async_flags & ASYNC_HP)
3100 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3102 cfs_list_add_tail(&oap->oap_urgent_item,
3104 flags |= ASYNC_URGENT;
3105 loi_list_maint(cli, loi);
3107 cfs_spin_lock(&oap->oap_lock);
3108 oap->oap_async_flags |= flags;
3109 cfs_spin_unlock(&oap->oap_lock);
3111 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3112 oap->oap_async_flags);
3116 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3117 struct lov_oinfo *loi, struct osc_async_page *oap)
3119 struct client_obd *cli = &exp->exp_obd->u.cli;
3120 struct loi_oap_pages *lop;
3124 if (oap->oap_magic != OAP_MAGIC)
3128 loi = lsm->lsm_oinfo[0];
3130 if (oap->oap_cmd & OBD_BRW_WRITE) {
3131 lop = &loi->loi_write_lop;
3133 lop = &loi->loi_read_lop;
3136 client_obd_list_lock(&cli->cl_loi_list_lock);
3138 if (!cfs_list_empty(&oap->oap_rpc_item))
3139 GOTO(out, rc = -EBUSY);
3141 osc_exit_cache(cli, oap, 0);
3142 osc_wake_cache_waiters(cli);
3144 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3145 cfs_list_del_init(&oap->oap_urgent_item);
3146 cfs_spin_lock(&oap->oap_lock);
3147 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3148 cfs_spin_unlock(&oap->oap_lock);
3150 if (!cfs_list_empty(&oap->oap_pending_item)) {
3151 cfs_list_del_init(&oap->oap_pending_item);
3152 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3154 loi_list_maint(cli, loi);
3155 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3157 client_obd_list_unlock(&cli->cl_loi_list_lock);
3161 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3162 struct ldlm_enqueue_info *einfo)
3164 void *data = einfo->ei_cbdata;
3167 LASSERT(lock != NULL);
3168 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3169 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3170 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3171 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3173 lock_res_and_lock(lock);
3174 cfs_spin_lock(&osc_ast_guard);
3176 if (lock->l_ast_data == NULL)
3177 lock->l_ast_data = data;
3178 if (lock->l_ast_data == data)
3181 cfs_spin_unlock(&osc_ast_guard);
3182 unlock_res_and_lock(lock);
3187 static int osc_set_data_with_check(struct lustre_handle *lockh,
3188 struct ldlm_enqueue_info *einfo)
3190 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3194 set = osc_set_lock_data_with_check(lock, einfo);
3195 LDLM_LOCK_PUT(lock);
3197 CERROR("lockh %p, data %p - client evicted?\n",
3198 lockh, einfo->ei_cbdata);
3202 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3203 ldlm_iterator_t replace, void *data)
3205 struct ldlm_res_id res_id;
3206 struct obd_device *obd = class_exp2obd(exp);
3208 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3209 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3213 /* find any ldlm lock of the inode in osc
3217 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3218 ldlm_iterator_t replace, void *data)
3220 struct ldlm_res_id res_id;
3221 struct obd_device *obd = class_exp2obd(exp);
3224 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3225 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3226 if (rc == LDLM_ITER_STOP)
3228 if (rc == LDLM_ITER_CONTINUE)
3233 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3234 obd_enqueue_update_f upcall, void *cookie,
3235 int *flags, int agl, int rc)
3237 int intent = *flags & LDLM_FL_HAS_INTENT;
3241 /* The request was created before ldlm_cli_enqueue call. */
3242 if (rc == ELDLM_LOCK_ABORTED) {
3243 struct ldlm_reply *rep;
3244 rep = req_capsule_server_get(&req->rq_pill,
3247 LASSERT(rep != NULL);
3248 if (rep->lock_policy_res1)
3249 rc = rep->lock_policy_res1;
3253 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3255 *flags |= LDLM_FL_LVB_READY;
3256 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3257 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3260 /* Call the update callback. */
3261 rc = (*upcall)(cookie, rc);
3265 static int osc_enqueue_interpret(const struct lu_env *env,
3266 struct ptlrpc_request *req,
3267 struct osc_enqueue_args *aa, int rc)
3269 struct ldlm_lock *lock;
3270 struct lustre_handle handle;
3272 struct ost_lvb *lvb;
3274 int *flags = aa->oa_flags;
3276 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3277 * might be freed anytime after lock upcall has been called. */
3278 lustre_handle_copy(&handle, aa->oa_lockh);
3279 mode = aa->oa_ei->ei_mode;
3281 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3283 lock = ldlm_handle2lock(&handle);
3285 /* Take an additional reference so that a blocking AST that
3286 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3287 * to arrive after an upcall has been executed by
3288 * osc_enqueue_fini(). */
3289 ldlm_lock_addref(&handle, mode);
3291 /* Let CP AST to grant the lock first. */
3292 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3294 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3299 lvb_len = sizeof(*aa->oa_lvb);
3302 /* Complete obtaining the lock procedure. */
3303 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3304 mode, flags, lvb, lvb_len, &handle, rc);
3305 /* Complete osc stuff. */
3306 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3307 flags, aa->oa_agl, rc);
3309 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3311 /* Release the lock for async request. */
3312 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3314 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3315 * not already released by
3316 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3318 ldlm_lock_decref(&handle, mode);
3320 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3321 aa->oa_lockh, req, aa);
3322 ldlm_lock_decref(&handle, mode);
3323 LDLM_LOCK_PUT(lock);
3327 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3328 struct lov_oinfo *loi, int flags,
3329 struct ost_lvb *lvb, __u32 mode, int rc)
3331 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3333 if (rc == ELDLM_OK) {
3336 LASSERT(lock != NULL);
3337 loi->loi_lvb = *lvb;
3338 tmp = loi->loi_lvb.lvb_size;
3339 /* Extend KMS up to the end of this lock and no further
3340 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3341 if (tmp > lock->l_policy_data.l_extent.end)
3342 tmp = lock->l_policy_data.l_extent.end + 1;
3343 if (tmp >= loi->loi_kms) {
3344 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3345 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3346 loi_kms_set(loi, tmp);
3348 LDLM_DEBUG(lock, "lock acquired, setting rss="
3349 LPU64"; leaving kms="LPU64", end="LPU64,
3350 loi->loi_lvb.lvb_size, loi->loi_kms,
3351 lock->l_policy_data.l_extent.end);
3353 ldlm_lock_allow_match(lock);
3354 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3355 LASSERT(lock != NULL);
3356 loi->loi_lvb = *lvb;
3357 ldlm_lock_allow_match(lock);
3358 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3359 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3365 ldlm_lock_fail_match(lock);
3367 LDLM_LOCK_PUT(lock);
3370 EXPORT_SYMBOL(osc_update_enqueue);
3372 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3374 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3375 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3376 * other synchronous requests, however keeping some locks and trying to obtain
3377 * others may take a considerable amount of time in a case of ost failure; and
3378 * when other sync requests do not get released lock from a client, the client
3379 * is excluded from the cluster -- such scenarious make the life difficult, so
3380 * release locks just after they are obtained. */
3381 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3382 int *flags, ldlm_policy_data_t *policy,
3383 struct ost_lvb *lvb, int kms_valid,
3384 obd_enqueue_update_f upcall, void *cookie,
3385 struct ldlm_enqueue_info *einfo,
3386 struct lustre_handle *lockh,
3387 struct ptlrpc_request_set *rqset, int async, int agl)
3389 struct obd_device *obd = exp->exp_obd;
3390 struct ptlrpc_request *req = NULL;
3391 int intent = *flags & LDLM_FL_HAS_INTENT;
3392 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3397 /* Filesystem lock extents are extended to page boundaries so that
3398 * dealing with the page cache is a little smoother. */
3399 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3400 policy->l_extent.end |= ~CFS_PAGE_MASK;
3403 * kms is not valid when either object is completely fresh (so that no
3404 * locks are cached), or object was evicted. In the latter case cached
3405 * lock cannot be used, because it would prime inode state with
3406 * potentially stale LVB.
3411 /* Next, search for already existing extent locks that will cover us */
3412 /* If we're trying to read, we also search for an existing PW lock. The
3413 * VFS and page cache already protect us locally, so lots of readers/
3414 * writers can share a single PW lock.
3416 * There are problems with conversion deadlocks, so instead of
3417 * converting a read lock to a write lock, we'll just enqueue a new
3420 * At some point we should cancel the read lock instead of making them
3421 * send us a blocking callback, but there are problems with canceling
3422 * locks out from other users right now, too. */
3423 mode = einfo->ei_mode;
3424 if (einfo->ei_mode == LCK_PR)
3426 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3427 einfo->ei_type, policy, mode, lockh, 0);
3429 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3431 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3432 /* For AGL, if enqueue RPC is sent but the lock is not
3433 * granted, then skip to process this strpe.
3434 * Return -ECANCELED to tell the caller. */
3435 ldlm_lock_decref(lockh, mode);
3436 LDLM_LOCK_PUT(matched);
3438 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3439 *flags |= LDLM_FL_LVB_READY;
3440 /* addref the lock only if not async requests and PW
3441 * lock is matched whereas we asked for PR. */
3442 if (!rqset && einfo->ei_mode != mode)
3443 ldlm_lock_addref(lockh, LCK_PR);
3445 /* I would like to be able to ASSERT here that
3446 * rss <= kms, but I can't, for reasons which
3447 * are explained in lov_enqueue() */
3450 /* We already have a lock, and it's referenced */
3451 (*upcall)(cookie, ELDLM_OK);
3453 if (einfo->ei_mode != mode)
3454 ldlm_lock_decref(lockh, LCK_PW);
3456 /* For async requests, decref the lock. */
3457 ldlm_lock_decref(lockh, einfo->ei_mode);
3458 LDLM_LOCK_PUT(matched);
3461 ldlm_lock_decref(lockh, mode);
3462 LDLM_LOCK_PUT(matched);
3468 CFS_LIST_HEAD(cancels);
3469 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3470 &RQF_LDLM_ENQUEUE_LVB);
3474 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3476 ptlrpc_request_free(req);
3480 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3482 ptlrpc_request_set_replen(req);
3485 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3486 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3488 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3489 sizeof(*lvb), lockh, async);
3492 struct osc_enqueue_args *aa;
3493 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3494 aa = ptlrpc_req_async_args(req);
3497 aa->oa_flags = flags;
3498 aa->oa_upcall = upcall;
3499 aa->oa_cookie = cookie;
3501 aa->oa_lockh = lockh;
3504 req->rq_interpret_reply =
3505 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3506 if (rqset == PTLRPCD_SET)
3507 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3509 ptlrpc_set_add_req(rqset, req);
3510 } else if (intent) {
3511 ptlrpc_req_finished(req);
3516 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3518 ptlrpc_req_finished(req);
3523 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3524 struct ldlm_enqueue_info *einfo,
3525 struct ptlrpc_request_set *rqset)
3527 struct ldlm_res_id res_id;
3531 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3532 oinfo->oi_md->lsm_object_seq, &res_id);
3534 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3535 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3536 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3537 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3538 rqset, rqset != NULL, 0);
3542 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3543 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3544 int *flags, void *data, struct lustre_handle *lockh,
3547 struct obd_device *obd = exp->exp_obd;
3548 int lflags = *flags;
3552 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3555 /* Filesystem lock extents are extended to page boundaries so that
3556 * dealing with the page cache is a little smoother */
3557 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3558 policy->l_extent.end |= ~CFS_PAGE_MASK;
3560 /* Next, search for already existing extent locks that will cover us */
3561 /* If we're trying to read, we also search for an existing PW lock. The
3562 * VFS and page cache already protect us locally, so lots of readers/
3563 * writers can share a single PW lock. */
3567 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3568 res_id, type, policy, rc, lockh, unref);
3571 if (!osc_set_data_with_check(lockh, data)) {
3572 if (!(lflags & LDLM_FL_TEST_LOCK))
3573 ldlm_lock_decref(lockh, rc);
3577 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3578 ldlm_lock_addref(lockh, LCK_PR);
3579 ldlm_lock_decref(lockh, LCK_PW);
3586 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3590 if (unlikely(mode == LCK_GROUP))
3591 ldlm_lock_decref_and_cancel(lockh, mode);
3593 ldlm_lock_decref(lockh, mode);
3598 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3599 __u32 mode, struct lustre_handle *lockh)
3602 RETURN(osc_cancel_base(lockh, mode));
3605 static int osc_cancel_unused(struct obd_export *exp,
3606 struct lov_stripe_md *lsm,
3607 ldlm_cancel_flags_t flags,
3610 struct obd_device *obd = class_exp2obd(exp);
3611 struct ldlm_res_id res_id, *resp = NULL;
3614 resp = osc_build_res_name(lsm->lsm_object_id,
3615 lsm->lsm_object_seq, &res_id);
3618 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3621 static int osc_statfs_interpret(const struct lu_env *env,
3622 struct ptlrpc_request *req,
3623 struct osc_async_args *aa, int rc)
3625 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3626 struct obd_statfs *msfs;
3631 /* The request has in fact never been sent
3632 * due to issues at a higher level (LOV).
3633 * Exit immediately since the caller is
3634 * aware of the problem and takes care
3635 * of the clean up */
3638 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3639 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3645 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3647 GOTO(out, rc = -EPROTO);
3650 /* Reinitialize the RDONLY and DEGRADED flags at the client
3651 * on each statfs, so they don't stay set permanently. */
3652 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3654 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3655 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3656 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3657 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3659 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3660 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3661 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3662 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3664 /* Add a bit of hysteresis so this flag isn't continually flapping,
3665 * and ensure that new files don't get extremely fragmented due to
3666 * only a small amount of available space in the filesystem.
3667 * We want to set the NOSPC flag when there is less than ~0.1% free
3668 * and clear it when there is at least ~0.2% free space, so:
3669 * avail < ~0.1% max max = avail + used
3670 * 1025 * avail < avail + used used = blocks - free
3671 * 1024 * avail < used
3672 * 1024 * avail < blocks - free
3673 * avail < ((blocks - free) >> 10)
3675 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3676 * lose that amount of space so in those cases we report no space left
3677 * if their is less than 1 GB left. */
3678 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3679 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3680 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3681 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3682 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3683 (msfs->os_ffree > 64) &&
3684 (msfs->os_bavail > (used << 1)))) {
3685 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3686 OSCC_FLAG_NOSPC_BLK);
3689 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3690 (msfs->os_bavail < used)))
3691 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3693 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3695 *aa->aa_oi->oi_osfs = *msfs;
3697 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3701 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3702 __u64 max_age, struct ptlrpc_request_set *rqset)
3704 struct ptlrpc_request *req;
3705 struct osc_async_args *aa;
3709 /* We could possibly pass max_age in the request (as an absolute
3710 * timestamp or a "seconds.usec ago") so the target can avoid doing
3711 * extra calls into the filesystem if that isn't necessary (e.g.
3712 * during mount that would help a bit). Having relative timestamps
3713 * is not so great if request processing is slow, while absolute
3714 * timestamps are not ideal because they need time synchronization. */
3715 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3719 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3721 ptlrpc_request_free(req);
3724 ptlrpc_request_set_replen(req);
3725 req->rq_request_portal = OST_CREATE_PORTAL;
3726 ptlrpc_at_set_req_timeout(req);
3728 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3729 /* procfs requests not want stat in wait for avoid deadlock */
3730 req->rq_no_resend = 1;
3731 req->rq_no_delay = 1;
3734 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3735 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3736 aa = ptlrpc_req_async_args(req);
3739 ptlrpc_set_add_req(rqset, req);
3743 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3744 __u64 max_age, __u32 flags)
3746 struct obd_statfs *msfs;
3747 struct ptlrpc_request *req;
3748 struct obd_import *imp = NULL;
3752 /*Since the request might also come from lprocfs, so we need
3753 *sync this with client_disconnect_export Bug15684*/
3754 cfs_down_read(&obd->u.cli.cl_sem);
3755 if (obd->u.cli.cl_import)
3756 imp = class_import_get(obd->u.cli.cl_import);
3757 cfs_up_read(&obd->u.cli.cl_sem);
3761 /* We could possibly pass max_age in the request (as an absolute
3762 * timestamp or a "seconds.usec ago") so the target can avoid doing
3763 * extra calls into the filesystem if that isn't necessary (e.g.
3764 * during mount that would help a bit). Having relative timestamps
3765 * is not so great if request processing is slow, while absolute
3766 * timestamps are not ideal because they need time synchronization. */
3767 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3769 class_import_put(imp);
3774 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3776 ptlrpc_request_free(req);
3779 ptlrpc_request_set_replen(req);
3780 req->rq_request_portal = OST_CREATE_PORTAL;
3781 ptlrpc_at_set_req_timeout(req);
3783 if (flags & OBD_STATFS_NODELAY) {
3784 /* procfs requests not want stat in wait for avoid deadlock */
3785 req->rq_no_resend = 1;
3786 req->rq_no_delay = 1;
3789 rc = ptlrpc_queue_wait(req);
3793 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3795 GOTO(out, rc = -EPROTO);
3802 ptlrpc_req_finished(req);
3806 /* Retrieve object striping information.
3808 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3809 * the maximum number of OST indices which will fit in the user buffer.
3810 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3812 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3814 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3815 struct lov_user_md_v3 lum, *lumk;
3816 struct lov_user_ost_data_v1 *lmm_objects;
3817 int rc = 0, lum_size;
3823 /* we only need the header part from user space to get lmm_magic and
3824 * lmm_stripe_count, (the header part is common to v1 and v3) */
3825 lum_size = sizeof(struct lov_user_md_v1);
3826 if (cfs_copy_from_user(&lum, lump, lum_size))
3829 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3830 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3833 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3834 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3835 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3836 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3838 /* we can use lov_mds_md_size() to compute lum_size
3839 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3840 if (lum.lmm_stripe_count > 0) {
3841 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3842 OBD_ALLOC(lumk, lum_size);
3846 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3847 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3849 lmm_objects = &(lumk->lmm_objects[0]);
3850 lmm_objects->l_object_id = lsm->lsm_object_id;
3852 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3856 lumk->lmm_object_id = lsm->lsm_object_id;
3857 lumk->lmm_object_seq = lsm->lsm_object_seq;
3858 lumk->lmm_stripe_count = 1;
3860 if (cfs_copy_to_user(lump, lumk, lum_size))
3864 OBD_FREE(lumk, lum_size);
3870 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3871 void *karg, void *uarg)
3873 struct obd_device *obd = exp->exp_obd;
3874 struct obd_ioctl_data *data = karg;
3878 if (!cfs_try_module_get(THIS_MODULE)) {
3879 CERROR("Can't get module. Is it alive?");
3883 case OBD_IOC_LOV_GET_CONFIG: {
3885 struct lov_desc *desc;
3886 struct obd_uuid uuid;
3890 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3891 GOTO(out, err = -EINVAL);
3893 data = (struct obd_ioctl_data *)buf;
3895 if (sizeof(*desc) > data->ioc_inllen1) {
3896 obd_ioctl_freedata(buf, len);
3897 GOTO(out, err = -EINVAL);
3900 if (data->ioc_inllen2 < sizeof(uuid)) {
3901 obd_ioctl_freedata(buf, len);
3902 GOTO(out, err = -EINVAL);
3905 desc = (struct lov_desc *)data->ioc_inlbuf1;
3906 desc->ld_tgt_count = 1;
3907 desc->ld_active_tgt_count = 1;
3908 desc->ld_default_stripe_count = 1;
3909 desc->ld_default_stripe_size = 0;
3910 desc->ld_default_stripe_offset = 0;
3911 desc->ld_pattern = 0;
3912 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3914 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3916 err = cfs_copy_to_user((void *)uarg, buf, len);
3919 obd_ioctl_freedata(buf, len);
3922 case LL_IOC_LOV_SETSTRIPE:
3923 err = obd_alloc_memmd(exp, karg);
3927 case LL_IOC_LOV_GETSTRIPE:
3928 err = osc_getstripe(karg, uarg);
3930 case OBD_IOC_CLIENT_RECOVER:
3931 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3932 data->ioc_inlbuf1, 0);
3936 case IOC_OSC_SET_ACTIVE:
3937 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3940 case OBD_IOC_POLL_QUOTACHECK:
3941 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3943 case OBD_IOC_PING_TARGET:
3944 err = ptlrpc_obd_ping(obd);
3947 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3948 cmd, cfs_curproc_comm());
3949 GOTO(out, err = -ENOTTY);
3952 cfs_module_put(THIS_MODULE);
3956 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3957 void *key, __u32 *vallen, void *val,
3958 struct lov_stripe_md *lsm)
3961 if (!vallen || !val)
3964 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3965 __u32 *stripe = val;
3966 *vallen = sizeof(*stripe);
3969 } else if (KEY_IS(KEY_LAST_ID)) {
3970 struct ptlrpc_request *req;
3975 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3976 &RQF_OST_GET_INFO_LAST_ID);
3980 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3981 RCL_CLIENT, keylen);
3982 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3984 ptlrpc_request_free(req);
3988 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3989 memcpy(tmp, key, keylen);
3991 req->rq_no_delay = req->rq_no_resend = 1;
3992 ptlrpc_request_set_replen(req);
3993 rc = ptlrpc_queue_wait(req);
3997 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3999 GOTO(out, rc = -EPROTO);
4001 *((obd_id *)val) = *reply;
4003 ptlrpc_req_finished(req);
4005 } else if (KEY_IS(KEY_FIEMAP)) {
4006 struct ptlrpc_request *req;
4007 struct ll_user_fiemap *reply;
4011 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4012 &RQF_OST_GET_INFO_FIEMAP);
4016 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4017 RCL_CLIENT, keylen);
4018 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4019 RCL_CLIENT, *vallen);
4020 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4021 RCL_SERVER, *vallen);
4023 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4025 ptlrpc_request_free(req);
4029 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4030 memcpy(tmp, key, keylen);
4031 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4032 memcpy(tmp, val, *vallen);
4034 ptlrpc_request_set_replen(req);
4035 rc = ptlrpc_queue_wait(req);
4039 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4041 GOTO(out1, rc = -EPROTO);
4043 memcpy(val, reply, *vallen);
4045 ptlrpc_req_finished(req);
4053 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4055 struct llog_ctxt *ctxt;
4059 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4061 rc = llog_initiator_connect(ctxt);
4062 llog_ctxt_put(ctxt);
4064 /* XXX return an error? skip setting below flags? */
4067 cfs_spin_lock(&imp->imp_lock);
4068 imp->imp_server_timeout = 1;
4069 imp->imp_pingable = 1;
4070 cfs_spin_unlock(&imp->imp_lock);
4071 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4076 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4077 struct ptlrpc_request *req,
4084 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4087 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4088 void *key, obd_count vallen, void *val,
4089 struct ptlrpc_request_set *set)
4091 struct ptlrpc_request *req;
4092 struct obd_device *obd = exp->exp_obd;
4093 struct obd_import *imp = class_exp2cliimp(exp);
4098 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4100 if (KEY_IS(KEY_NEXT_ID)) {
4102 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4104 if (vallen != sizeof(obd_id))
4109 if (vallen != sizeof(obd_id))
4112 /* avoid race between allocate new object and set next id
4113 * from ll_sync thread */
4114 cfs_spin_lock(&oscc->oscc_lock);
4115 new_val = *((obd_id*)val) + 1;
4116 if (new_val > oscc->oscc_next_id)
4117 oscc->oscc_next_id = new_val;
4118 cfs_spin_unlock(&oscc->oscc_lock);
4119 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4120 exp->exp_obd->obd_name,
4121 obd->u.cli.cl_oscc.oscc_next_id);
4126 if (KEY_IS(KEY_CHECKSUM)) {
4127 if (vallen != sizeof(int))
4129 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4133 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4134 sptlrpc_conf_client_adapt(obd);
4138 if (KEY_IS(KEY_FLUSH_CTX)) {
4139 sptlrpc_import_flush_my_ctx(imp);
4143 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4146 /* We pass all other commands directly to OST. Since nobody calls osc
4147 methods directly and everybody is supposed to go through LOV, we
4148 assume lov checked invalid values for us.
4149 The only recognised values so far are evict_by_nid and mds_conn.
4150 Even if something bad goes through, we'd get a -EINVAL from OST
4153 if (KEY_IS(KEY_GRANT_SHRINK))
4154 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4156 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4161 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4162 RCL_CLIENT, keylen);
4163 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4164 RCL_CLIENT, vallen);
4165 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4167 ptlrpc_request_free(req);
4171 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4172 memcpy(tmp, key, keylen);
4173 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4174 memcpy(tmp, val, vallen);
4176 if (KEY_IS(KEY_MDS_CONN)) {
4177 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4179 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4180 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4181 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4182 req->rq_no_delay = req->rq_no_resend = 1;
4183 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4184 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4185 struct osc_grant_args *aa;
4188 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4189 aa = ptlrpc_req_async_args(req);
4192 ptlrpc_req_finished(req);
4195 *oa = ((struct ost_body *)val)->oa;
4197 req->rq_interpret_reply = osc_shrink_grant_interpret;
4200 ptlrpc_request_set_replen(req);
4201 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4202 LASSERT(set != NULL);
4203 ptlrpc_set_add_req(set, req);
4204 ptlrpc_check_set(NULL, set);
4206 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4212 static struct llog_operations osc_size_repl_logops = {
4213 lop_cancel: llog_obd_repl_cancel
4216 static struct llog_operations osc_mds_ost_orig_logops;
4218 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4219 struct obd_device *tgt, struct llog_catid *catid)
4224 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4225 &catid->lci_logid, &osc_mds_ost_orig_logops);
4227 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4231 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4232 NULL, &osc_size_repl_logops);
4234 struct llog_ctxt *ctxt =
4235 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4238 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4243 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4244 obd->obd_name, tgt->obd_name, catid, rc);
4245 CERROR("logid "LPX64":0x%x\n",
4246 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4251 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4252 struct obd_device *disk_obd, int *index)
4254 struct llog_catid catid;
4255 static char name[32] = CATLIST;
4259 LASSERT(olg == &obd->obd_olg);
4261 cfs_mutex_lock(&olg->olg_cat_processing);
4262 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4264 CERROR("rc: %d\n", rc);
4268 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4269 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4270 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4272 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4274 CERROR("rc: %d\n", rc);
4278 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4280 CERROR("rc: %d\n", rc);
4285 cfs_mutex_unlock(&olg->olg_cat_processing);
4290 static int osc_llog_finish(struct obd_device *obd, int count)
4292 struct llog_ctxt *ctxt;
4293 int rc = 0, rc2 = 0;
4296 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4298 rc = llog_cleanup(ctxt);
4300 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4302 rc2 = llog_cleanup(ctxt);
4309 static int osc_reconnect(const struct lu_env *env,
4310 struct obd_export *exp, struct obd_device *obd,
4311 struct obd_uuid *cluuid,
4312 struct obd_connect_data *data,
4315 struct client_obd *cli = &obd->u.cli;
4317 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4320 client_obd_list_lock(&cli->cl_loi_list_lock);
4321 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4322 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4323 lost_grant = cli->cl_lost_grant;
4324 cli->cl_lost_grant = 0;
4325 client_obd_list_unlock(&cli->cl_loi_list_lock);
4327 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4328 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4329 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4330 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4331 " ocd_grant: %d\n", data->ocd_connect_flags,
4332 data->ocd_version, data->ocd_grant);
4338 static int osc_disconnect(struct obd_export *exp)
4340 struct obd_device *obd = class_exp2obd(exp);
4341 struct llog_ctxt *ctxt;
4344 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4346 if (obd->u.cli.cl_conn_count == 1) {
4347 /* Flush any remaining cancel messages out to the
4349 llog_sync(ctxt, exp);
4351 llog_ctxt_put(ctxt);
4353 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4357 rc = client_disconnect_export(exp);
4359 * Initially we put del_shrink_grant before disconnect_export, but it
4360 * causes the following problem if setup (connect) and cleanup
4361 * (disconnect) are tangled together.
4362 * connect p1 disconnect p2
4363 * ptlrpc_connect_import
4364 * ............... class_manual_cleanup
4367 * ptlrpc_connect_interrupt
4369 * add this client to shrink list
4371 * Bang! pinger trigger the shrink.
4372 * So the osc should be disconnected from the shrink list, after we
4373 * are sure the import has been destroyed. BUG18662
4375 if (obd->u.cli.cl_import == NULL)
4376 osc_del_shrink_grant(&obd->u.cli);
4380 static int osc_import_event(struct obd_device *obd,
4381 struct obd_import *imp,
4382 enum obd_import_event event)
4384 struct client_obd *cli;
4388 LASSERT(imp->imp_obd == obd);
4391 case IMP_EVENT_DISCON: {
4392 /* Only do this on the MDS OSC's */
4393 if (imp->imp_server_timeout) {
4394 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4396 cfs_spin_lock(&oscc->oscc_lock);
4397 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4398 cfs_spin_unlock(&oscc->oscc_lock);
4401 client_obd_list_lock(&cli->cl_loi_list_lock);
4402 cli->cl_avail_grant = 0;
4403 cli->cl_lost_grant = 0;
4404 client_obd_list_unlock(&cli->cl_loi_list_lock);
4407 case IMP_EVENT_INACTIVE: {
4408 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4411 case IMP_EVENT_INVALIDATE: {
4412 struct ldlm_namespace *ns = obd->obd_namespace;
4416 env = cl_env_get(&refcheck);
4420 client_obd_list_lock(&cli->cl_loi_list_lock);
4421 /* all pages go to failing rpcs due to the invalid
4423 osc_check_rpcs(env, cli);
4424 client_obd_list_unlock(&cli->cl_loi_list_lock);
4426 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4427 cl_env_put(env, &refcheck);
4432 case IMP_EVENT_ACTIVE: {
4433 /* Only do this on the MDS OSC's */
4434 if (imp->imp_server_timeout) {
4435 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4437 cfs_spin_lock(&oscc->oscc_lock);
4438 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4439 OSCC_FLAG_NOSPC_BLK);
4440 cfs_spin_unlock(&oscc->oscc_lock);
4442 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4445 case IMP_EVENT_OCD: {
4446 struct obd_connect_data *ocd = &imp->imp_connect_data;
4448 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4449 osc_init_grant(&obd->u.cli, ocd);
4452 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4453 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4455 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4458 case IMP_EVENT_DEACTIVATE: {
4459 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4462 case IMP_EVENT_ACTIVATE: {
4463 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4467 CERROR("Unknown import event %d\n", event);
4474 * Determine whether the lock can be canceled before replaying the lock
4475 * during recovery, see bug16774 for detailed information.
4477 * \retval zero the lock can't be canceled
4478 * \retval other ok to cancel
4480 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4482 check_res_locked(lock->l_resource);
4485 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4487 * XXX as a future improvement, we can also cancel unused write lock
4488 * if it doesn't have dirty data and active mmaps.
4490 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4491 (lock->l_granted_mode == LCK_PR ||
4492 lock->l_granted_mode == LCK_CR) &&
4493 (osc_dlm_lock_pageref(lock) == 0))
4499 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4501 struct client_obd *cli = &obd->u.cli;
4506 rc = ptlrpcd_addref();
4510 rc = client_obd_setup(obd, lcfg);
4513 handler = ptlrpcd_alloc_work(cli->cl_import,
4514 brw_queue_work, cli);
4515 if (!IS_ERR(handler))
4516 cli->cl_writeback_work = handler;
4518 rc = PTR_ERR(handler);
4522 struct lprocfs_static_vars lvars = { 0 };
4524 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4525 lprocfs_osc_init_vars(&lvars);
4526 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4527 lproc_osc_attach_seqstat(obd);
4528 sptlrpc_lprocfs_cliobd_attach(obd);
4529 ptlrpc_lprocfs_register_obd(obd);
4533 /* We need to allocate a few requests more, because
4534 brw_interpret tries to create new requests before freeing
4535 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4536 reserved, but I afraid that might be too much wasted RAM
4537 in fact, so 2 is just my guess and still should work. */
4538 cli->cl_import->imp_rq_pool =
4539 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4541 ptlrpc_add_rqs_to_pool);
4543 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4545 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4553 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4559 case OBD_CLEANUP_EARLY: {
4560 struct obd_import *imp;
4561 imp = obd->u.cli.cl_import;
4562 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4563 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4564 ptlrpc_deactivate_import(imp);
4565 cfs_spin_lock(&imp->imp_lock);
4566 imp->imp_pingable = 0;
4567 cfs_spin_unlock(&imp->imp_lock);
4570 case OBD_CLEANUP_EXPORTS: {
4571 struct client_obd *cli = &obd->u.cli;
4573 * for echo client, export may be on zombie list, wait for
4574 * zombie thread to cull it, because cli.cl_import will be
4575 * cleared in client_disconnect_export():
4576 * class_export_destroy() -> obd_cleanup() ->
4577 * echo_device_free() -> echo_client_cleanup() ->
4578 * obd_disconnect() -> osc_disconnect() ->
4579 * client_disconnect_export()
4581 obd_zombie_barrier();
4582 if (cli->cl_writeback_work) {
4583 ptlrpcd_destroy_work(cli->cl_writeback_work);
4584 cli->cl_writeback_work = NULL;
4586 obd_cleanup_client_import(obd);
4587 ptlrpc_lprocfs_unregister_obd(obd);
4588 lprocfs_obd_cleanup(obd);
4589 rc = obd_llog_finish(obd, 0);
4591 CERROR("failed to cleanup llogging subsystems\n");
4598 int osc_cleanup(struct obd_device *obd)
4604 /* free memory of osc quota cache */
4605 osc_quota_cleanup(obd);
4607 rc = client_obd_cleanup(obd);
4613 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4615 struct lprocfs_static_vars lvars = { 0 };
4618 lprocfs_osc_init_vars(&lvars);
4620 switch (lcfg->lcfg_command) {
4622 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4632 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4634 return osc_process_config_base(obd, buf);
4637 struct obd_ops osc_obd_ops = {
4638 .o_owner = THIS_MODULE,
4639 .o_setup = osc_setup,
4640 .o_precleanup = osc_precleanup,
4641 .o_cleanup = osc_cleanup,
4642 .o_add_conn = client_import_add_conn,
4643 .o_del_conn = client_import_del_conn,
4644 .o_connect = client_connect_import,
4645 .o_reconnect = osc_reconnect,
4646 .o_disconnect = osc_disconnect,
4647 .o_statfs = osc_statfs,
4648 .o_statfs_async = osc_statfs_async,
4649 .o_packmd = osc_packmd,
4650 .o_unpackmd = osc_unpackmd,
4651 .o_precreate = osc_precreate,
4652 .o_create = osc_create,
4653 .o_create_async = osc_create_async,
4654 .o_destroy = osc_destroy,
4655 .o_getattr = osc_getattr,
4656 .o_getattr_async = osc_getattr_async,
4657 .o_setattr = osc_setattr,
4658 .o_setattr_async = osc_setattr_async,
4660 .o_punch = osc_punch,
4662 .o_enqueue = osc_enqueue,
4663 .o_change_cbdata = osc_change_cbdata,
4664 .o_find_cbdata = osc_find_cbdata,
4665 .o_cancel = osc_cancel,
4666 .o_cancel_unused = osc_cancel_unused,
4667 .o_iocontrol = osc_iocontrol,
4668 .o_get_info = osc_get_info,
4669 .o_set_info_async = osc_set_info_async,
4670 .o_import_event = osc_import_event,
4671 .o_llog_init = osc_llog_init,
4672 .o_llog_finish = osc_llog_finish,
4673 .o_process_config = osc_process_config,
4674 .o_quotactl = osc_quotactl,
4675 .o_quotacheck = osc_quotacheck,
4676 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
4679 extern struct lu_kmem_descr osc_caches[];
4680 extern cfs_spinlock_t osc_ast_guard;
4681 extern cfs_lock_class_key_t osc_ast_guard_class;
4683 int __init osc_init(void)
4685 struct lprocfs_static_vars lvars = { 0 };
4689 /* print an address of _any_ initialized kernel symbol from this
4690 * module, to allow debugging with gdb that doesn't support data
4691 * symbols from modules.*/
4692 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4694 rc = lu_kmem_init(osc_caches);
4696 lprocfs_osc_init_vars(&lvars);
4699 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4700 LUSTRE_OSC_NAME, &osc_device_type);
4702 lu_kmem_fini(osc_caches);
4706 cfs_spin_lock_init(&osc_ast_guard);
4707 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4709 osc_mds_ost_orig_logops = llog_lvfs_ops;
4710 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4711 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4712 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4713 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4719 static void /*__exit*/ osc_exit(void)
4721 lu_device_type_fini(&osc_device_type);
4724 class_unregister_type(LUSTRE_OSC_NAME);
4725 lu_kmem_fini(osc_caches);
4728 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4729 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4730 MODULE_LICENSE("GPL");
4732 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);