4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 static int brw_interpret(const struct lu_env *env,
68 struct ptlrpc_request *req, void *data, int rc);
69 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
71 int osc_cleanup(struct obd_device *obd);
73 /* Pack OSC object metadata for disk storage (LE byte order). */
74 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
75 struct lov_stripe_md *lsm)
80 lmm_size = sizeof(**lmmp);
85 OBD_FREE(*lmmp, lmm_size);
91 OBD_ALLOC(*lmmp, lmm_size);
97 LASSERT(lsm->lsm_object_id);
98 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
99 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
106 /* Unpack OSC object metadata from disk storage (LE byte order). */
107 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
108 struct lov_mds_md *lmm, int lmm_bytes)
111 struct obd_import *imp = class_exp2cliimp(exp);
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
160 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
161 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
163 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
168 static inline void osc_pack_capa(struct ptlrpc_request *req,
169 struct ost_body *body, void *capa)
171 struct obd_capa *oc = (struct obd_capa *)capa;
172 struct lustre_capa *c;
177 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
180 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
181 DEBUG_CAPA(D_SEC, c, "pack");
184 static inline void osc_pack_req_body(struct ptlrpc_request *req,
185 struct obd_info *oinfo)
187 struct ost_body *body;
189 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
192 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
193 osc_pack_capa(req, body, oinfo->oi_capa);
196 static inline void osc_set_capa_size(struct ptlrpc_request *req,
197 const struct req_msg_field *field,
201 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
203 /* it is already calculated as sizeof struct obd_capa */
207 static int osc_getattr_interpret(const struct lu_env *env,
208 struct ptlrpc_request *req,
209 struct osc_async_args *aa, int rc)
211 struct ost_body *body;
217 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
219 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
220 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
222 /* This should really be sent by the OST */
223 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
224 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
226 CDEBUG(D_INFO, "can't unpack ost_body\n");
228 aa->aa_oi->oi_oa->o_valid = 0;
231 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
235 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
236 struct ptlrpc_request_set *set)
238 struct ptlrpc_request *req;
239 struct osc_async_args *aa;
243 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
247 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
248 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
250 ptlrpc_request_free(req);
254 osc_pack_req_body(req, oinfo);
256 ptlrpc_request_set_replen(req);
257 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
259 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
260 aa = ptlrpc_req_async_args(req);
263 ptlrpc_set_add_req(set, req);
267 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
268 struct obd_info *oinfo)
270 struct ptlrpc_request *req;
271 struct ost_body *body;
275 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
280 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
282 ptlrpc_request_free(req);
286 osc_pack_req_body(req, oinfo);
288 ptlrpc_request_set_replen(req);
290 rc = ptlrpc_queue_wait(req);
294 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
296 GOTO(out, rc = -EPROTO);
298 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
299 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
301 /* This should really be sent by the OST */
302 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
303 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
307 ptlrpc_req_finished(req);
311 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
312 struct obd_info *oinfo, struct obd_trans_info *oti)
314 struct ptlrpc_request *req;
315 struct ost_body *body;
319 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
321 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
325 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
328 ptlrpc_request_free(req);
332 osc_pack_req_body(req, oinfo);
334 ptlrpc_request_set_replen(req);
336 rc = ptlrpc_queue_wait(req);
340 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
342 GOTO(out, rc = -EPROTO);
344 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
348 ptlrpc_req_finished(req);
352 static int osc_setattr_interpret(const struct lu_env *env,
353 struct ptlrpc_request *req,
354 struct osc_setattr_args *sa, int rc)
356 struct ost_body *body;
362 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364 GOTO(out, rc = -EPROTO);
366 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
368 rc = sa->sa_upcall(sa->sa_cookie, rc);
372 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
373 struct obd_trans_info *oti,
374 obd_enqueue_update_f upcall, void *cookie,
375 struct ptlrpc_request_set *rqset)
377 struct ptlrpc_request *req;
378 struct osc_setattr_args *sa;
382 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
386 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
387 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
389 ptlrpc_request_free(req);
393 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
394 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396 osc_pack_req_body(req, oinfo);
398 ptlrpc_request_set_replen(req);
400 /* do mds to ost setattr asynchronously */
402 /* Do not wait for response. */
403 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
405 req->rq_interpret_reply =
406 (ptlrpc_interpterer_t)osc_setattr_interpret;
408 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
409 sa = ptlrpc_req_async_args(req);
410 sa->sa_oa = oinfo->oi_oa;
411 sa->sa_upcall = upcall;
412 sa->sa_cookie = cookie;
414 if (rqset == PTLRPCD_SET)
415 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
417 ptlrpc_set_add_req(rqset, req);
423 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
424 struct obd_trans_info *oti,
425 struct ptlrpc_request_set *rqset)
427 return osc_setattr_async_base(exp, oinfo, oti,
428 oinfo->oi_cb_up, oinfo, rqset);
431 int osc_real_create(struct obd_export *exp, struct obdo *oa,
432 struct lov_stripe_md **ea, struct obd_trans_info *oti)
434 struct ptlrpc_request *req;
435 struct ost_body *body;
436 struct lov_stripe_md *lsm;
445 rc = obd_alloc_memmd(exp, &lsm);
450 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
452 GOTO(out, rc = -ENOMEM);
454 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
456 ptlrpc_request_free(req);
460 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462 lustre_set_wire_obdo(&body->oa, oa);
464 ptlrpc_request_set_replen(req);
466 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467 oa->o_flags == OBD_FL_DELORPHAN) {
469 "delorphan from OST integration");
470 /* Don't resend the delorphan req */
471 req->rq_no_resend = req->rq_no_delay = 1;
474 rc = ptlrpc_queue_wait(req);
478 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
480 GOTO(out_req, rc = -EPROTO);
482 lustre_get_wire_obdo(oa, &body->oa);
484 /* This should really be sent by the OST */
485 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
486 oa->o_valid |= OBD_MD_FLBLKSZ;
488 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489 * have valid lsm_oinfo data structs, so don't go touching that.
490 * This needs to be fixed in a big way.
492 lsm->lsm_object_id = oa->o_id;
493 lsm->lsm_object_seq = oa->o_seq;
497 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
499 if (oa->o_valid & OBD_MD_FLCOOKIE) {
500 if (!oti->oti_logcookies)
501 oti_alloc_cookies(oti, 1);
502 *oti->oti_logcookies = oa->o_lcookie;
506 CDEBUG(D_HA, "transno: "LPD64"\n",
507 lustre_msg_get_transno(req->rq_repmsg));
509 ptlrpc_req_finished(req);
512 obd_free_memmd(exp, &lsm);
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517 obd_enqueue_update_f upcall, void *cookie,
518 struct ptlrpc_request_set *rqset)
520 struct ptlrpc_request *req;
521 struct osc_setattr_args *sa;
522 struct ost_body *body;
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
533 ptlrpc_request_free(req);
536 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537 ptlrpc_at_set_req_timeout(req);
539 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
541 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
542 osc_pack_capa(req, body, oinfo->oi_capa);
544 ptlrpc_request_set_replen(req);
546 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548 sa = ptlrpc_req_async_args(req);
549 sa->sa_oa = oinfo->oi_oa;
550 sa->sa_upcall = upcall;
551 sa->sa_cookie = cookie;
552 if (rqset == PTLRPCD_SET)
553 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555 ptlrpc_set_add_req(rqset, req);
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561 struct obd_info *oinfo, struct obd_trans_info *oti,
562 struct ptlrpc_request_set *rqset)
564 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
565 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567 return osc_punch_base(exp, oinfo,
568 oinfo->oi_cb_up, oinfo, rqset);
571 static int osc_sync_interpret(const struct lu_env *env,
572 struct ptlrpc_request *req,
575 struct osc_async_args *aa = arg;
576 struct ost_body *body;
582 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
584 CERROR ("can't unpack ost_body\n");
585 GOTO(out, rc = -EPROTO);
588 *aa->aa_oi->oi_oa = body->oa;
590 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
594 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
595 struct obd_info *oinfo, obd_size start, obd_size end,
596 struct ptlrpc_request_set *set)
598 struct ptlrpc_request *req;
599 struct ost_body *body;
600 struct osc_async_args *aa;
605 CDEBUG(D_INFO, "oa NULL\n");
609 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
613 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
614 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
616 ptlrpc_request_free(req);
620 /* overload the size and blocks fields in the oa with start/end */
621 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
623 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
624 body->oa.o_size = start;
625 body->oa.o_blocks = end;
626 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
627 osc_pack_capa(req, body, oinfo->oi_capa);
629 ptlrpc_request_set_replen(req);
630 req->rq_interpret_reply = osc_sync_interpret;
632 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
633 aa = ptlrpc_req_async_args(req);
636 ptlrpc_set_add_req(set, req);
640 /* Find and cancel locally locks matched by @mode in the resource found by
641 * @objid. Found locks are added into @cancel list. Returns the amount of
642 * locks added to @cancels list. */
643 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
645 ldlm_mode_t mode, int lock_flags)
647 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
648 struct ldlm_res_id res_id;
649 struct ldlm_resource *res;
653 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
654 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
658 LDLM_RESOURCE_ADDREF(res);
659 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
660 lock_flags, 0, NULL);
661 LDLM_RESOURCE_DELREF(res);
662 ldlm_resource_putref(res);
666 static int osc_destroy_interpret(const struct lu_env *env,
667 struct ptlrpc_request *req, void *data,
670 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672 cfs_atomic_dec(&cli->cl_destroy_in_flight);
673 cfs_waitq_signal(&cli->cl_destroy_waitq);
677 static int osc_can_send_destroy(struct client_obd *cli)
679 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
680 cli->cl_max_rpcs_in_flight) {
681 /* The destroy request can be sent */
684 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
685 cli->cl_max_rpcs_in_flight) {
687 * The counter has been modified between the two atomic
690 cfs_waitq_signal(&cli->cl_destroy_waitq);
695 /* Destroy requests can be async always on the client, and we don't even really
696 * care about the return code since the client cannot do anything at all about
698 * When the MDS is unlinking a filename, it saves the file objects into a
699 * recovery llog, and these object records are cancelled when the OST reports
700 * they were destroyed and sync'd to disk (i.e. transaction committed).
701 * If the client dies, or the OST is down when the object should be destroyed,
702 * the records are not cancelled, and when the OST reconnects to the MDS next,
703 * it will retrieve the llog unlink logs and then sends the log cancellation
704 * cookies to the MDS after committing destroy transactions. */
705 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
706 struct obdo *oa, struct lov_stripe_md *ea,
707 struct obd_trans_info *oti, struct obd_export *md_export,
710 struct client_obd *cli = &exp->exp_obd->u.cli;
711 struct ptlrpc_request *req;
712 struct ost_body *body;
713 CFS_LIST_HEAD(cancels);
718 CDEBUG(D_INFO, "oa NULL\n");
722 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
723 LDLM_FL_DISCARD_DATA);
725 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
727 ldlm_lock_list_put(&cancels, l_bl_ast, count);
731 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
732 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
735 ptlrpc_request_free(req);
739 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
740 ptlrpc_at_set_req_timeout(req);
742 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
743 oa->o_lcookie = *oti->oti_logcookies;
744 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
746 lustre_set_wire_obdo(&body->oa, oa);
748 osc_pack_capa(req, body, (struct obd_capa *)capa);
749 ptlrpc_request_set_replen(req);
751 /* don't throttle destroy RPCs for the MDT */
752 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
753 req->rq_interpret_reply = osc_destroy_interpret;
754 if (!osc_can_send_destroy(cli)) {
755 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
759 * Wait until the number of on-going destroy RPCs drops
760 * under max_rpc_in_flight
762 l_wait_event_exclusive(cli->cl_destroy_waitq,
763 osc_can_send_destroy(cli), &lwi);
767 /* Do not wait for response */
768 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
772 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
775 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
777 LASSERT(!(oa->o_valid & bits));
780 client_obd_list_lock(&cli->cl_loi_list_lock);
781 oa->o_dirty = cli->cl_dirty;
782 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
783 CERROR("dirty %lu - %lu > dirty_max %lu\n",
784 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
786 } else if (cfs_atomic_read(&obd_dirty_pages) -
787 cfs_atomic_read(&obd_dirty_transit_pages) >
788 obd_max_dirty_pages + 1){
789 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
790 * not covered by a lock thus they may safely race and trip
791 * this CERROR() unless we add in a small fudge factor (+1). */
792 CERROR("dirty %d - %d > system dirty_max %d\n",
793 cfs_atomic_read(&obd_dirty_pages),
794 cfs_atomic_read(&obd_dirty_transit_pages),
795 obd_max_dirty_pages);
797 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
798 CERROR("dirty %lu - dirty_max %lu too big???\n",
799 cli->cl_dirty, cli->cl_dirty_max);
802 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
803 (cli->cl_max_rpcs_in_flight + 1);
804 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
806 oa->o_grant = cli->cl_avail_grant;
807 oa->o_dropped = cli->cl_lost_grant;
808 cli->cl_lost_grant = 0;
809 client_obd_list_unlock(&cli->cl_loi_list_lock);
810 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
811 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
815 static void osc_update_next_shrink(struct client_obd *cli)
817 cli->cl_next_shrink_grant =
818 cfs_time_shift(cli->cl_grant_shrink_interval);
819 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
820 cli->cl_next_shrink_grant);
823 /* caller must hold loi_list_lock */
824 static void osc_consume_write_grant(struct client_obd *cli,
825 struct brw_page *pga)
827 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
828 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
829 cfs_atomic_inc(&obd_dirty_pages);
830 cli->cl_dirty += CFS_PAGE_SIZE;
831 cli->cl_avail_grant -= CFS_PAGE_SIZE;
832 pga->flag |= OBD_BRW_FROM_GRANT;
833 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
834 CFS_PAGE_SIZE, pga, pga->pg);
835 LASSERT(cli->cl_avail_grant >= 0);
836 osc_update_next_shrink(cli);
839 /* the companion to osc_consume_write_grant, called when a brw has completed.
840 * must be called with the loi lock held. */
841 static void osc_release_write_grant(struct client_obd *cli,
842 struct brw_page *pga, int sent)
844 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
847 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
848 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
853 pga->flag &= ~OBD_BRW_FROM_GRANT;
854 cfs_atomic_dec(&obd_dirty_pages);
855 cli->cl_dirty -= CFS_PAGE_SIZE;
856 if (pga->flag & OBD_BRW_NOCACHE) {
857 pga->flag &= ~OBD_BRW_NOCACHE;
858 cfs_atomic_dec(&obd_dirty_transit_pages);
859 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
862 /* Reclaim grant from truncated pages. This is used to solve
863 * write-truncate and grant all gone(to lost_grant) problem.
864 * For a vfs write this problem can be easily solved by a sync
865 * write, however, this is not an option for page_mkwrite()
866 * because grant has to be allocated before a page becomes
868 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
869 cli->cl_avail_grant += CFS_PAGE_SIZE;
871 cli->cl_lost_grant += CFS_PAGE_SIZE;
872 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
873 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
874 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
875 /* For short writes we shouldn't count parts of pages that
876 * span a whole block on the OST side, or our accounting goes
877 * wrong. Should match the code in filter_grant_check. */
878 int offset = pga->off & ~CFS_PAGE_MASK;
879 int count = pga->count + (offset & (blocksize - 1));
880 int end = (offset + pga->count) & (blocksize - 1);
882 count += blocksize - end;
884 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
885 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
886 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
887 cli->cl_avail_grant, cli->cl_dirty);
893 static unsigned long rpcs_in_flight(struct client_obd *cli)
895 return cli->cl_r_in_flight + cli->cl_w_in_flight;
898 /* caller must hold loi_list_lock */
899 void osc_wake_cache_waiters(struct client_obd *cli)
902 struct osc_cache_waiter *ocw;
905 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
906 /* if we can't dirty more, we must wait until some is written */
907 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
908 (cfs_atomic_read(&obd_dirty_pages) + 1 >
909 obd_max_dirty_pages)) {
910 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
911 "osc max %ld, sys max %d\n", cli->cl_dirty,
912 cli->cl_dirty_max, obd_max_dirty_pages);
916 /* if still dirty cache but no grant wait for pending RPCs that
917 * may yet return us some grant before doing sync writes */
918 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
919 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
920 cli->cl_w_in_flight);
924 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
925 cfs_list_del_init(&ocw->ocw_entry);
926 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
927 /* no more RPCs in flight to return grant, do sync IO */
928 ocw->ocw_rc = -EDQUOT;
929 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
931 osc_consume_write_grant(cli,
932 &ocw->ocw_oap->oap_brw_page);
935 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
936 ocw, ocw->ocw_oap, cli->cl_avail_grant);
938 cfs_waitq_signal(&ocw->ocw_waitq);
944 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
946 client_obd_list_lock(&cli->cl_loi_list_lock);
947 cli->cl_avail_grant += grant;
948 client_obd_list_unlock(&cli->cl_loi_list_lock);
951 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
953 if (body->oa.o_valid & OBD_MD_FLGRANT) {
954 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
955 __osc_update_grant(cli, body->oa.o_grant);
959 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
960 obd_count keylen, void *key, obd_count vallen,
961 void *val, struct ptlrpc_request_set *set);
963 static int osc_shrink_grant_interpret(const struct lu_env *env,
964 struct ptlrpc_request *req,
967 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
968 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
969 struct ost_body *body;
972 __osc_update_grant(cli, oa->o_grant);
976 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
978 osc_update_grant(cli, body);
984 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
986 client_obd_list_lock(&cli->cl_loi_list_lock);
987 oa->o_grant = cli->cl_avail_grant / 4;
988 cli->cl_avail_grant -= oa->o_grant;
989 client_obd_list_unlock(&cli->cl_loi_list_lock);
990 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
991 oa->o_valid |= OBD_MD_FLFLAGS;
994 oa->o_flags |= OBD_FL_SHRINK_GRANT;
995 osc_update_next_shrink(cli);
998 /* Shrink the current grant, either from some large amount to enough for a
999 * full set of in-flight RPCs, or if we have already shrunk to that limit
1000 * then to enough for a single RPC. This avoids keeping more grant than
1001 * needed, and avoids shrinking the grant piecemeal. */
1002 static int osc_shrink_grant(struct client_obd *cli)
1004 long target = (cli->cl_max_rpcs_in_flight + 1) *
1005 cli->cl_max_pages_per_rpc;
1007 client_obd_list_lock(&cli->cl_loi_list_lock);
1008 if (cli->cl_avail_grant <= target)
1009 target = cli->cl_max_pages_per_rpc;
1010 client_obd_list_unlock(&cli->cl_loi_list_lock);
1012 return osc_shrink_grant_to_target(cli, target);
1015 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1018 struct ost_body *body;
1021 client_obd_list_lock(&cli->cl_loi_list_lock);
1022 /* Don't shrink if we are already above or below the desired limit
1023 * We don't want to shrink below a single RPC, as that will negatively
1024 * impact block allocation and long-term performance. */
1025 if (target < cli->cl_max_pages_per_rpc)
1026 target = cli->cl_max_pages_per_rpc;
1028 if (target >= cli->cl_avail_grant) {
1029 client_obd_list_unlock(&cli->cl_loi_list_lock);
1032 client_obd_list_unlock(&cli->cl_loi_list_lock);
1034 OBD_ALLOC_PTR(body);
1038 osc_announce_cached(cli, &body->oa, 0);
1040 client_obd_list_lock(&cli->cl_loi_list_lock);
1041 body->oa.o_grant = cli->cl_avail_grant - target;
1042 cli->cl_avail_grant = target;
1043 client_obd_list_unlock(&cli->cl_loi_list_lock);
1044 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1045 body->oa.o_valid |= OBD_MD_FLFLAGS;
1046 body->oa.o_flags = 0;
1048 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1049 osc_update_next_shrink(cli);
1051 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
1052 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1053 sizeof(*body), body, NULL);
1055 __osc_update_grant(cli, body->oa.o_grant);
1060 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1061 static int osc_should_shrink_grant(struct client_obd *client)
1063 cfs_time_t time = cfs_time_current();
1064 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1066 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1067 OBD_CONNECT_GRANT_SHRINK) == 0)
1070 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1071 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1072 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1075 osc_update_next_shrink(client);
1080 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1082 struct client_obd *client;
1084 cfs_list_for_each_entry(client, &item->ti_obd_list,
1085 cl_grant_shrink_list) {
1086 if (osc_should_shrink_grant(client))
1087 osc_shrink_grant(client);
1092 static int osc_add_shrink_grant(struct client_obd *client)
1096 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1098 osc_grant_shrink_grant_cb, NULL,
1099 &client->cl_grant_shrink_list);
1101 CERROR("add grant client %s error %d\n",
1102 client->cl_import->imp_obd->obd_name, rc);
1105 CDEBUG(D_CACHE, "add grant client %s \n",
1106 client->cl_import->imp_obd->obd_name);
1107 osc_update_next_shrink(client);
1111 static int osc_del_shrink_grant(struct client_obd *client)
1113 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1117 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1120 * ocd_grant is the total grant amount we're expect to hold: if we've
1121 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1122 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1124 * race is tolerable here: if we're evicted, but imp_state already
1125 * left EVICTED state, then cl_dirty must be 0 already.
1127 client_obd_list_lock(&cli->cl_loi_list_lock);
1128 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1129 cli->cl_avail_grant = ocd->ocd_grant;
1131 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1133 if (cli->cl_avail_grant < 0) {
1134 CWARN("%s: available grant < 0, the OSS is probably not running"
1135 " with patch from bug20278 (%ld) \n",
1136 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1137 /* workaround for 1.6 servers which do not have
1138 * the patch from bug20278 */
1139 cli->cl_avail_grant = ocd->ocd_grant;
1142 client_obd_list_unlock(&cli->cl_loi_list_lock);
1144 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1145 cli->cl_import->imp_obd->obd_name,
1146 cli->cl_avail_grant, cli->cl_lost_grant);
1148 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1149 cfs_list_empty(&cli->cl_grant_shrink_list))
1150 osc_add_shrink_grant(cli);
1153 /* We assume that the reason this OSC got a short read is because it read
1154 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1155 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1156 * this stripe never got written at or beyond this stripe offset yet. */
1157 static void handle_short_read(int nob_read, obd_count page_count,
1158 struct brw_page **pga)
1163 /* skip bytes read OK */
1164 while (nob_read > 0) {
1165 LASSERT (page_count > 0);
1167 if (pga[i]->count > nob_read) {
1168 /* EOF inside this page */
1169 ptr = cfs_kmap(pga[i]->pg) +
1170 (pga[i]->off & ~CFS_PAGE_MASK);
1171 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1172 cfs_kunmap(pga[i]->pg);
1178 nob_read -= pga[i]->count;
1183 /* zero remaining pages */
1184 while (page_count-- > 0) {
1185 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1186 memset(ptr, 0, pga[i]->count);
1187 cfs_kunmap(pga[i]->pg);
1192 static int check_write_rcs(struct ptlrpc_request *req,
1193 int requested_nob, int niocount,
1194 obd_count page_count, struct brw_page **pga)
1199 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1200 sizeof(*remote_rcs) *
1202 if (remote_rcs == NULL) {
1203 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1207 /* return error if any niobuf was in error */
1208 for (i = 0; i < niocount; i++) {
1209 if ((int)remote_rcs[i] < 0)
1210 return(remote_rcs[i]);
1212 if (remote_rcs[i] != 0) {
1213 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1214 i, remote_rcs[i], req);
1219 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1220 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1221 req->rq_bulk->bd_nob_transferred, requested_nob);
1228 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1230 if (p1->flag != p2->flag) {
1231 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1232 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1234 /* warn if we try to combine flags that we don't know to be
1235 * safe to combine */
1236 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1237 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1238 "report this at http://bugs.whamcloud.com/\n",
1239 p1->flag, p2->flag);
1244 return (p1->off + p1->count == p2->off);
1247 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1248 struct brw_page **pga, int opc,
1249 cksum_type_t cksum_type)
1254 LASSERT (pg_count > 0);
1255 cksum = init_checksum(cksum_type);
1256 while (nob > 0 && pg_count > 0) {
1257 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1258 int off = pga[i]->off & ~CFS_PAGE_MASK;
1259 int count = pga[i]->count > nob ? nob : pga[i]->count;
1261 /* corrupt the data before we compute the checksum, to
1262 * simulate an OST->client data error */
1263 if (i == 0 && opc == OST_READ &&
1264 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1265 memcpy(ptr + off, "bad1", min(4, nob));
1266 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1267 cfs_kunmap(pga[i]->pg);
1268 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1271 nob -= pga[i]->count;
1275 /* For sending we only compute the wrong checksum instead
1276 * of corrupting the data so it is still correct on a redo */
1277 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1280 return fini_checksum(cksum, cksum_type);
1283 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1284 struct lov_stripe_md *lsm, obd_count page_count,
1285 struct brw_page **pga,
1286 struct ptlrpc_request **reqp,
1287 struct obd_capa *ocapa, int reserve,
1290 struct ptlrpc_request *req;
1291 struct ptlrpc_bulk_desc *desc;
1292 struct ost_body *body;
1293 struct obd_ioobj *ioobj;
1294 struct niobuf_remote *niobuf;
1295 int niocount, i, requested_nob, opc, rc;
1296 struct osc_brw_async_args *aa;
1297 struct req_capsule *pill;
1298 struct brw_page *pg_prev;
1301 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1302 RETURN(-ENOMEM); /* Recoverable */
1303 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1304 RETURN(-EINVAL); /* Fatal */
1306 if ((cmd & OBD_BRW_WRITE) != 0) {
1308 req = ptlrpc_request_alloc_pool(cli->cl_import,
1309 cli->cl_import->imp_rq_pool,
1310 &RQF_OST_BRW_WRITE);
1313 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1318 for (niocount = i = 1; i < page_count; i++) {
1319 if (!can_merge_pages(pga[i - 1], pga[i]))
1323 pill = &req->rq_pill;
1324 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1326 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1327 niocount * sizeof(*niobuf));
1328 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1330 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1332 ptlrpc_request_free(req);
1335 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1336 ptlrpc_at_set_req_timeout(req);
1338 if (opc == OST_WRITE)
1339 desc = ptlrpc_prep_bulk_imp(req, page_count,
1340 BULK_GET_SOURCE, OST_BULK_PORTAL);
1342 desc = ptlrpc_prep_bulk_imp(req, page_count,
1343 BULK_PUT_SINK, OST_BULK_PORTAL);
1346 GOTO(out, rc = -ENOMEM);
1347 /* NB request now owns desc and will free it when it gets freed */
1349 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1350 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1351 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1352 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1354 lustre_set_wire_obdo(&body->oa, oa);
1356 obdo_to_ioobj(oa, ioobj);
1357 ioobj->ioo_bufcnt = niocount;
1358 osc_pack_capa(req, body, ocapa);
1359 LASSERT (page_count > 0);
1361 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1362 struct brw_page *pg = pga[i];
1363 int poff = pg->off & ~CFS_PAGE_MASK;
1365 LASSERT(pg->count > 0);
1366 /* make sure there is no gap in the middle of page array */
1367 LASSERTF(page_count == 1 ||
1368 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1369 ergo(i > 0 && i < page_count - 1,
1370 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1371 ergo(i == page_count - 1, poff == 0)),
1372 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1373 i, page_count, pg, pg->off, pg->count);
1375 LASSERTF(i == 0 || pg->off > pg_prev->off,
1376 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1377 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1379 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1380 pg_prev->pg, page_private(pg_prev->pg),
1381 pg_prev->pg->index, pg_prev->off);
1383 LASSERTF(i == 0 || pg->off > pg_prev->off,
1384 "i %d p_c %u\n", i, page_count);
1386 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1387 (pg->flag & OBD_BRW_SRVLOCK));
1389 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1390 requested_nob += pg->count;
1392 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1394 niobuf->len += pg->count;
1396 niobuf->offset = pg->off;
1397 niobuf->len = pg->count;
1398 niobuf->flags = pg->flag;
1403 LASSERTF((void *)(niobuf - niocount) ==
1404 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1405 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1406 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1408 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1410 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1411 body->oa.o_valid |= OBD_MD_FLFLAGS;
1412 body->oa.o_flags = 0;
1414 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1417 if (osc_should_shrink_grant(cli))
1418 osc_shrink_grant_local(cli, &body->oa);
1420 /* size[REQ_REC_OFF] still sizeof (*body) */
1421 if (opc == OST_WRITE) {
1422 if (cli->cl_checksum &&
1423 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1424 /* store cl_cksum_type in a local variable since
1425 * it can be changed via lprocfs */
1426 cksum_type_t cksum_type = cli->cl_cksum_type;
1428 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1429 oa->o_flags &= OBD_FL_LOCAL_MASK;
1430 body->oa.o_flags = 0;
1432 body->oa.o_flags |= cksum_type_pack(cksum_type);
1433 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1434 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1438 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1440 /* save this in 'oa', too, for later checking */
1441 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1442 oa->o_flags |= cksum_type_pack(cksum_type);
1444 /* clear out the checksum flag, in case this is a
1445 * resend but cl_checksum is no longer set. b=11238 */
1446 oa->o_valid &= ~OBD_MD_FLCKSUM;
1448 oa->o_cksum = body->oa.o_cksum;
1449 /* 1 RC per niobuf */
1450 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1451 sizeof(__u32) * niocount);
1453 if (cli->cl_checksum &&
1454 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1455 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1456 body->oa.o_flags = 0;
1457 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1458 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1461 ptlrpc_request_set_replen(req);
1463 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1464 aa = ptlrpc_req_async_args(req);
1466 aa->aa_requested_nob = requested_nob;
1467 aa->aa_nio_count = niocount;
1468 aa->aa_page_count = page_count;
1472 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1473 if (ocapa && reserve)
1474 aa->aa_ocapa = capa_get(ocapa);
1480 ptlrpc_req_finished(req);
1484 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1485 __u32 client_cksum, __u32 server_cksum, int nob,
1486 obd_count page_count, struct brw_page **pga,
1487 cksum_type_t client_cksum_type)
1491 cksum_type_t cksum_type;
1493 if (server_cksum == client_cksum) {
1494 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1498 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1500 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1503 if (cksum_type != client_cksum_type)
1504 msg = "the server did not use the checksum type specified in "
1505 "the original request - likely a protocol problem";
1506 else if (new_cksum == server_cksum)
1507 msg = "changed on the client after we checksummed it - "
1508 "likely false positive due to mmap IO (bug 11742)";
1509 else if (new_cksum == client_cksum)
1510 msg = "changed in transit before arrival at OST";
1512 msg = "changed in transit AND doesn't match the original - "
1513 "likely false positive due to mmap IO (bug 11742)";
1515 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1516 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1517 msg, libcfs_nid2str(peer->nid),
1518 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1519 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1520 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1522 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1524 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1525 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1526 "client csum now %x\n", client_cksum, client_cksum_type,
1527 server_cksum, cksum_type, new_cksum);
1531 /* Note rc enters this function as number of bytes transferred */
1532 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1534 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1535 const lnet_process_id_t *peer =
1536 &req->rq_import->imp_connection->c_peer;
1537 struct client_obd *cli = aa->aa_cli;
1538 struct ost_body *body;
1539 __u32 client_cksum = 0;
1542 if (rc < 0 && rc != -EDQUOT) {
1543 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1547 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1548 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1550 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1554 /* set/clear over quota flag for a uid/gid */
1555 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1556 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1557 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1559 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1560 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1562 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1565 osc_update_grant(cli, body);
1570 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1571 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1573 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1575 CERROR("Unexpected +ve rc %d\n", rc);
1578 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1580 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1583 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1584 check_write_checksum(&body->oa, peer, client_cksum,
1585 body->oa.o_cksum, aa->aa_requested_nob,
1586 aa->aa_page_count, aa->aa_ppga,
1587 cksum_type_unpack(aa->aa_oa->o_flags)))
1590 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1591 aa->aa_page_count, aa->aa_ppga);
1595 /* The rest of this function executes only for OST_READs */
1597 /* if unwrap_bulk failed, return -EAGAIN to retry */
1598 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1600 GOTO(out, rc = -EAGAIN);
1602 if (rc > aa->aa_requested_nob) {
1603 CERROR("Unexpected rc %d (%d requested)\n", rc,
1604 aa->aa_requested_nob);
1608 if (rc != req->rq_bulk->bd_nob_transferred) {
1609 CERROR ("Unexpected rc %d (%d transferred)\n",
1610 rc, req->rq_bulk->bd_nob_transferred);
1614 if (rc < aa->aa_requested_nob)
1615 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1617 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1618 static int cksum_counter;
1619 __u32 server_cksum = body->oa.o_cksum;
1622 cksum_type_t cksum_type;
1624 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1625 body->oa.o_flags : 0);
1626 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1627 aa->aa_ppga, OST_READ,
1630 if (peer->nid == req->rq_bulk->bd_sender) {
1634 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1637 if (server_cksum == ~0 && rc > 0) {
1638 CERROR("Protocol error: server %s set the 'checksum' "
1639 "bit, but didn't send a checksum. Not fatal, "
1640 "but please notify on http://bugs.whamcloud.com/\n",
1641 libcfs_nid2str(peer->nid));
1642 } else if (server_cksum != client_cksum) {
1643 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1644 "%s%s%s inode "DFID" object "
1645 LPU64"/"LPU64" extent "
1646 "["LPU64"-"LPU64"]\n",
1647 req->rq_import->imp_obd->obd_name,
1648 libcfs_nid2str(peer->nid),
1650 body->oa.o_valid & OBD_MD_FLFID ?
1651 body->oa.o_parent_seq : (__u64)0,
1652 body->oa.o_valid & OBD_MD_FLFID ?
1653 body->oa.o_parent_oid : 0,
1654 body->oa.o_valid & OBD_MD_FLFID ?
1655 body->oa.o_parent_ver : 0,
1657 body->oa.o_valid & OBD_MD_FLGROUP ?
1658 body->oa.o_seq : (__u64)0,
1659 aa->aa_ppga[0]->off,
1660 aa->aa_ppga[aa->aa_page_count-1]->off +
1661 aa->aa_ppga[aa->aa_page_count-1]->count -
1663 CERROR("client %x, server %x, cksum_type %x\n",
1664 client_cksum, server_cksum, cksum_type);
1666 aa->aa_oa->o_cksum = client_cksum;
1670 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1673 } else if (unlikely(client_cksum)) {
1674 static int cksum_missed;
1677 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1678 CERROR("Checksum %u requested from %s but not sent\n",
1679 cksum_missed, libcfs_nid2str(peer->nid));
1685 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1690 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1691 struct lov_stripe_md *lsm,
1692 obd_count page_count, struct brw_page **pga,
1693 struct obd_capa *ocapa)
1695 struct ptlrpc_request *req;
1698 int generation, resends = 0;
1699 struct l_wait_info lwi;
1703 cfs_waitq_init(&waitq);
1704 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1707 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1708 page_count, pga, &req, ocapa, 0, resends);
1713 req->rq_generation_set = 1;
1714 req->rq_import_generation = generation;
1715 req->rq_sent = cfs_time_current_sec() + resends;
1718 rc = ptlrpc_queue_wait(req);
1720 if (rc == -ETIMEDOUT && req->rq_resend) {
1721 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1722 ptlrpc_req_finished(req);
1726 rc = osc_brw_fini_request(req, rc);
1728 ptlrpc_req_finished(req);
1729 /* When server return -EINPROGRESS, client should always retry
1730 * regardless of the number of times the bulk was resent already.*/
1731 if (osc_recoverable_error(rc)) {
1733 if (rc != -EINPROGRESS &&
1734 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1735 CERROR("%s: too many resend retries for object: "
1736 ""LPU64":"LPU64", rc = %d.\n",
1737 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1741 exp->exp_obd->u.cli.cl_import->imp_generation) {
1742 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1743 ""LPU64":"LPU64", rc = %d.\n",
1744 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1748 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1750 l_wait_event(waitq, 0, &lwi);
1755 if (rc == -EAGAIN || rc == -EINPROGRESS)
1760 int osc_brw_redo_request(struct ptlrpc_request *request,
1761 struct osc_brw_async_args *aa)
1763 struct ptlrpc_request *new_req;
1764 struct ptlrpc_request_set *set = request->rq_set;
1765 struct osc_brw_async_args *new_aa;
1766 struct osc_async_page *oap;
1770 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1772 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1773 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1774 aa->aa_cli, aa->aa_oa,
1775 NULL /* lsm unused by osc currently */,
1776 aa->aa_page_count, aa->aa_ppga,
1777 &new_req, aa->aa_ocapa, 0, 1);
1781 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1783 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1784 if (oap->oap_request != NULL) {
1785 LASSERTF(request == oap->oap_request,
1786 "request %p != oap_request %p\n",
1787 request, oap->oap_request);
1788 if (oap->oap_interrupted) {
1789 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1790 ptlrpc_req_finished(new_req);
1795 /* New request takes over pga and oaps from old request.
1796 * Note that copying a list_head doesn't work, need to move it... */
1798 new_req->rq_interpret_reply = request->rq_interpret_reply;
1799 new_req->rq_async_args = request->rq_async_args;
1800 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1801 new_req->rq_generation_set = 1;
1802 new_req->rq_import_generation = request->rq_import_generation;
1804 new_aa = ptlrpc_req_async_args(new_req);
1806 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1807 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1808 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1810 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1811 if (oap->oap_request) {
1812 ptlrpc_req_finished(oap->oap_request);
1813 oap->oap_request = ptlrpc_request_addref(new_req);
1817 new_aa->aa_ocapa = aa->aa_ocapa;
1818 aa->aa_ocapa = NULL;
1820 /* use ptlrpc_set_add_req is safe because interpret functions work
1821 * in check_set context. only one way exist with access to request
1822 * from different thread got -EINTR - this way protected with
1823 * cl_loi_list_lock */
1824 ptlrpc_set_add_req(set, new_req);
1826 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1828 DEBUG_REQ(D_INFO, new_req, "new request");
1833 * ugh, we want disk allocation on the target to happen in offset order. we'll
1834 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1835 * fine for our small page arrays and doesn't require allocation. its an
1836 * insertion sort that swaps elements that are strides apart, shrinking the
1837 * stride down until its '1' and the array is sorted.
1839 static void sort_brw_pages(struct brw_page **array, int num)
1842 struct brw_page *tmp;
1846 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1851 for (i = stride ; i < num ; i++) {
1854 while (j >= stride && array[j - stride]->off > tmp->off) {
1855 array[j] = array[j - stride];
1860 } while (stride > 1);
1863 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1869 LASSERT (pages > 0);
1870 offset = pg[i]->off & ~CFS_PAGE_MASK;
1874 if (pages == 0) /* that's all */
1877 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1878 return count; /* doesn't end on page boundary */
1881 offset = pg[i]->off & ~CFS_PAGE_MASK;
1882 if (offset != 0) /* doesn't start on page boundary */
1889 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1891 struct brw_page **ppga;
1894 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1898 for (i = 0; i < count; i++)
1903 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1905 LASSERT(ppga != NULL);
1906 OBD_FREE(ppga, sizeof(*ppga) * count);
1909 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1910 obd_count page_count, struct brw_page *pga,
1911 struct obd_trans_info *oti)
1913 struct obdo *saved_oa = NULL;
1914 struct brw_page **ppga, **orig;
1915 struct obd_import *imp = class_exp2cliimp(exp);
1916 struct client_obd *cli;
1917 int rc, page_count_orig;
1920 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1921 cli = &imp->imp_obd->u.cli;
1923 if (cmd & OBD_BRW_CHECK) {
1924 /* The caller just wants to know if there's a chance that this
1925 * I/O can succeed */
1927 if (imp->imp_invalid)
1932 /* test_brw with a failed create can trip this, maybe others. */
1933 LASSERT(cli->cl_max_pages_per_rpc);
1937 orig = ppga = osc_build_ppga(pga, page_count);
1940 page_count_orig = page_count;
1942 sort_brw_pages(ppga, page_count);
1943 while (page_count) {
1944 obd_count pages_per_brw;
1946 if (page_count > cli->cl_max_pages_per_rpc)
1947 pages_per_brw = cli->cl_max_pages_per_rpc;
1949 pages_per_brw = page_count;
1951 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1953 if (saved_oa != NULL) {
1954 /* restore previously saved oa */
1955 *oinfo->oi_oa = *saved_oa;
1956 } else if (page_count > pages_per_brw) {
1957 /* save a copy of oa (brw will clobber it) */
1958 OBDO_ALLOC(saved_oa);
1959 if (saved_oa == NULL)
1960 GOTO(out, rc = -ENOMEM);
1961 *saved_oa = *oinfo->oi_oa;
1964 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1965 pages_per_brw, ppga, oinfo->oi_capa);
1970 page_count -= pages_per_brw;
1971 ppga += pages_per_brw;
1975 osc_release_ppga(orig, page_count_orig);
1977 if (saved_oa != NULL)
1978 OBDO_FREE(saved_oa);
1983 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1984 * the dirty accounting. Writeback completes or truncate happens before
1985 * writing starts. Must be called with the loi lock held. */
1986 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1989 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1993 /* This maintains the lists of pending pages to read/write for a given object
1994 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1995 * to quickly find objects that are ready to send an RPC. */
1996 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
2001 if (lop->lop_num_pending == 0)
2004 /* if we have an invalid import we want to drain the queued pages
2005 * by forcing them through rpcs that immediately fail and complete
2006 * the pages. recovery relies on this to empty the queued pages
2007 * before canceling the locks and evicting down the llite pages */
2008 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2011 /* stream rpcs in queue order as long as as there is an urgent page
2012 * queued. this is our cheap solution for good batching in the case
2013 * where writepage marks some random page in the middle of the file
2014 * as urgent because of, say, memory pressure */
2015 if (!cfs_list_empty(&lop->lop_urgent)) {
2016 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2020 if (cmd & OBD_BRW_WRITE) {
2021 /* trigger a write rpc stream as long as there are dirtiers
2022 * waiting for space. as they're waiting, they're not going to
2023 * create more pages to coalesce with what's waiting.. */
2024 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2025 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2029 if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2035 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2037 struct osc_async_page *oap;
2040 if (cfs_list_empty(&lop->lop_urgent))
2043 oap = cfs_list_entry(lop->lop_urgent.next,
2044 struct osc_async_page, oap_urgent_item);
2046 if (oap->oap_async_flags & ASYNC_HP) {
2047 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2054 static void on_list(cfs_list_t *item, cfs_list_t *list,
2057 if (cfs_list_empty(item) && should_be_on)
2058 cfs_list_add_tail(item, list);
2059 else if (!cfs_list_empty(item) && !should_be_on)
2060 cfs_list_del_init(item);
2063 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2064 * can find pages to build into rpcs quickly */
2065 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2067 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2068 lop_makes_hprpc(&loi->loi_read_lop)) {
2070 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2071 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2073 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2074 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2075 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2076 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2079 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2080 loi->loi_write_lop.lop_num_pending);
2082 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2083 loi->loi_read_lop.lop_num_pending);
2086 static void lop_update_pending(struct client_obd *cli,
2087 struct loi_oap_pages *lop, int cmd, int delta)
2089 lop->lop_num_pending += delta;
2090 if (cmd & OBD_BRW_WRITE)
2091 cli->cl_pending_w_pages += delta;
2093 cli->cl_pending_r_pages += delta;
2097 * this is called when a sync waiter receives an interruption. Its job is to
2098 * get the caller woken as soon as possible. If its page hasn't been put in an
2099 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2100 * desiring interruption which will forcefully complete the rpc once the rpc
2103 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2105 struct loi_oap_pages *lop;
2106 struct lov_oinfo *loi;
2110 LASSERT(!oap->oap_interrupted);
2111 oap->oap_interrupted = 1;
2113 /* ok, it's been put in an rpc. only one oap gets a request reference */
2114 if (oap->oap_request != NULL) {
2115 ptlrpc_mark_interrupted(oap->oap_request);
2116 ptlrpcd_wake(oap->oap_request);
2117 ptlrpc_req_finished(oap->oap_request);
2118 oap->oap_request = NULL;
2122 * page completion may be called only if ->cpo_prep() method was
2123 * executed by osc_io_submit(), that also adds page the to pending list
2125 if (!cfs_list_empty(&oap->oap_pending_item)) {
2126 cfs_list_del_init(&oap->oap_pending_item);
2127 cfs_list_del_init(&oap->oap_urgent_item);
2130 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2131 &loi->loi_write_lop : &loi->loi_read_lop;
2132 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2133 loi_list_maint(oap->oap_cli, oap->oap_loi);
2134 rc = oap->oap_caller_ops->ap_completion(env,
2135 oap->oap_caller_data,
2136 oap->oap_cmd, NULL, -EINTR);
2142 /* this is trying to propogate async writeback errors back up to the
2143 * application. As an async write fails we record the error code for later if
2144 * the app does an fsync. As long as errors persist we force future rpcs to be
2145 * sync so that the app can get a sync error and break the cycle of queueing
2146 * pages for which writeback will fail. */
2147 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2154 ar->ar_force_sync = 1;
2155 ar->ar_min_xid = ptlrpc_sample_next_xid();
2160 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2161 ar->ar_force_sync = 0;
2164 void osc_oap_to_pending(struct osc_async_page *oap)
2166 struct loi_oap_pages *lop;
2168 if (oap->oap_cmd & OBD_BRW_WRITE)
2169 lop = &oap->oap_loi->loi_write_lop;
2171 lop = &oap->oap_loi->loi_read_lop;
2173 if (oap->oap_async_flags & ASYNC_HP)
2174 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2175 else if (oap->oap_async_flags & ASYNC_URGENT)
2176 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2177 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2178 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2181 /* this must be called holding the loi list lock to give coverage to exit_cache,
2182 * async_flag maintenance, and oap_request */
2183 static void osc_ap_completion(const struct lu_env *env,
2184 struct client_obd *cli, struct obdo *oa,
2185 struct osc_async_page *oap, int sent, int rc)
2190 if (oap->oap_request != NULL) {
2191 xid = ptlrpc_req_xid(oap->oap_request);
2192 ptlrpc_req_finished(oap->oap_request);
2193 oap->oap_request = NULL;
2196 cfs_spin_lock(&oap->oap_lock);
2197 oap->oap_async_flags = 0;
2198 cfs_spin_unlock(&oap->oap_lock);
2199 oap->oap_interrupted = 0;
2201 if (oap->oap_cmd & OBD_BRW_WRITE) {
2202 osc_process_ar(&cli->cl_ar, xid, rc);
2203 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2206 if (rc == 0 && oa != NULL) {
2207 if (oa->o_valid & OBD_MD_FLBLOCKS)
2208 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2209 if (oa->o_valid & OBD_MD_FLMTIME)
2210 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2211 if (oa->o_valid & OBD_MD_FLATIME)
2212 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2213 if (oa->o_valid & OBD_MD_FLCTIME)
2214 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2217 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2218 oap->oap_cmd, oa, rc);
2220 /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2221 * start, but OSC calls it under lock and thus we can add oap back to
2224 /* upper layer wants to leave the page on pending queue */
2225 osc_oap_to_pending(oap);
2227 osc_exit_cache(cli, oap, sent);
2231 static int brw_queue_work(const struct lu_env *env, void *data)
2233 struct client_obd *cli = data;
2235 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2237 client_obd_list_lock(&cli->cl_loi_list_lock);
2238 osc_check_rpcs0(env, cli, 1);
2239 client_obd_list_unlock(&cli->cl_loi_list_lock);
2243 static int brw_interpret(const struct lu_env *env,
2244 struct ptlrpc_request *req, void *data, int rc)
2246 struct osc_brw_async_args *aa = data;
2247 struct client_obd *cli;
2251 rc = osc_brw_fini_request(req, rc);
2252 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2253 /* When server return -EINPROGRESS, client should always retry
2254 * regardless of the number of times the bulk was resent already. */
2255 if (osc_recoverable_error(rc)) {
2256 if (req->rq_import_generation !=
2257 req->rq_import->imp_generation) {
2258 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2259 ""LPU64":"LPU64", rc = %d.\n",
2260 req->rq_import->imp_obd->obd_name,
2261 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2262 } else if (rc == -EINPROGRESS ||
2263 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2264 rc = osc_brw_redo_request(req, aa);
2266 CERROR("%s: too many resent retries for object: "
2267 ""LPU64":"LPU64", rc = %d.\n",
2268 req->rq_import->imp_obd->obd_name,
2269 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2274 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2279 capa_put(aa->aa_ocapa);
2280 aa->aa_ocapa = NULL;
2284 client_obd_list_lock(&cli->cl_loi_list_lock);
2286 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2287 * is called so we know whether to go to sync BRWs or wait for more
2288 * RPCs to complete */
2289 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2290 cli->cl_w_in_flight--;
2292 cli->cl_r_in_flight--;
2294 async = cfs_list_empty(&aa->aa_oaps);
2295 if (!async) { /* from osc_send_oap_rpc() */
2296 struct osc_async_page *oap, *tmp;
2297 /* the caller may re-use the oap after the completion call so
2298 * we need to clean it up a little */
2299 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2301 cfs_list_del_init(&oap->oap_rpc_item);
2302 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2304 OBDO_FREE(aa->aa_oa);
2305 } else { /* from async_internal() */
2307 for (i = 0; i < aa->aa_page_count; i++)
2308 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2310 osc_wake_cache_waiters(cli);
2311 osc_check_rpcs0(env, cli, 1);
2312 client_obd_list_unlock(&cli->cl_loi_list_lock);
2315 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2316 req->rq_bulk->bd_nob_transferred);
2317 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2318 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2323 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2324 struct client_obd *cli,
2325 cfs_list_t *rpc_list,
2326 int page_count, int cmd)
2328 struct ptlrpc_request *req;
2329 struct brw_page **pga = NULL;
2330 struct osc_brw_async_args *aa;
2331 struct obdo *oa = NULL;
2332 const struct obd_async_page_ops *ops = NULL;
2333 struct osc_async_page *oap;
2334 struct osc_async_page *tmp;
2335 struct cl_req *clerq = NULL;
2336 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2337 struct ldlm_lock *lock = NULL;
2338 struct cl_req_attr crattr;
2339 int i, rc, mpflag = 0;
2342 LASSERT(!cfs_list_empty(rpc_list));
2344 if (cmd & OBD_BRW_MEMALLOC)
2345 mpflag = cfs_memory_pressure_get_and_set();
2347 memset(&crattr, 0, sizeof crattr);
2348 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2350 GOTO(out, req = ERR_PTR(-ENOMEM));
2354 GOTO(out, req = ERR_PTR(-ENOMEM));
2357 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2358 struct cl_page *page = osc_oap2cl_page(oap);
2360 ops = oap->oap_caller_ops;
2362 clerq = cl_req_alloc(env, page, crt,
2363 1 /* only 1-object rpcs for
2366 GOTO(out, req = (void *)clerq);
2367 lock = oap->oap_ldlm_lock;
2369 pga[i] = &oap->oap_brw_page;
2370 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2371 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2372 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2374 cl_req_page_add(env, clerq, page);
2377 /* always get the data for the obdo for the rpc */
2378 LASSERT(ops != NULL);
2380 crattr.cra_capa = NULL;
2381 memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2382 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2384 oa->o_handle = lock->l_remote_handle;
2385 oa->o_valid |= OBD_MD_FLHANDLE;
2388 rc = cl_req_prep(env, clerq);
2390 CERROR("cl_req_prep failed: %d\n", rc);
2391 GOTO(out, req = ERR_PTR(rc));
2394 sort_brw_pages(pga, page_count);
2395 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2396 pga, &req, crattr.cra_capa, 1, 0);
2398 CERROR("prep_req failed: %d\n", rc);
2399 GOTO(out, req = ERR_PTR(rc));
2402 if (cmd & OBD_BRW_MEMALLOC)
2403 req->rq_memalloc = 1;
2405 /* Need to update the timestamps after the request is built in case
2406 * we race with setattr (locally or in queue at OST). If OST gets
2407 * later setattr before earlier BRW (as determined by the request xid),
2408 * the OST will not use BRW timestamps. Sadly, there is no obvious
2409 * way to do this in a single call. bug 10150 */
2410 cl_req_attr_set(env, clerq, &crattr,
2411 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2413 lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2415 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2416 aa = ptlrpc_req_async_args(req);
2417 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2418 cfs_list_splice(rpc_list, &aa->aa_oaps);
2419 CFS_INIT_LIST_HEAD(rpc_list);
2420 aa->aa_clerq = clerq;
2422 if (cmd & OBD_BRW_MEMALLOC)
2423 cfs_memory_pressure_restore(mpflag);
2425 capa_put(crattr.cra_capa);
2430 OBD_FREE(pga, sizeof(*pga) * page_count);
2431 /* this should happen rarely and is pretty bad, it makes the
2432 * pending list not follow the dirty order */
2433 client_obd_list_lock(&cli->cl_loi_list_lock);
2434 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2435 cfs_list_del_init(&oap->oap_rpc_item);
2437 /* queued sync pages can be torn down while the pages
2438 * were between the pending list and the rpc */
2439 if (oap->oap_interrupted) {
2440 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2441 osc_ap_completion(env, cli, NULL, oap, 0,
2445 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2447 if (clerq && !IS_ERR(clerq))
2448 cl_req_completion(env, clerq, PTR_ERR(req));
2454 * prepare pages for ASYNC io and put pages in send queue.
2456 * \param cmd OBD_BRW_* macroses
2457 * \param lop pending pages
2459 * \return zero if no page added to send queue.
2460 * \return 1 if pages successfully added to send queue.
2461 * \return negative on errors.
2464 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2465 struct lov_oinfo *loi, int cmd,
2466 struct loi_oap_pages *lop, pdl_policy_t pol)
2468 struct ptlrpc_request *req;
2469 obd_count page_count = 0;
2470 struct osc_async_page *oap = NULL, *tmp;
2471 struct osc_brw_async_args *aa;
2472 const struct obd_async_page_ops *ops;
2473 CFS_LIST_HEAD(rpc_list);
2474 int srvlock = 0, mem_tight = 0;
2475 struct cl_object *clob = NULL;
2476 obd_off starting_offset = OBD_OBJECT_EOF;
2477 unsigned int ending_offset;
2478 int starting_page_off = 0;
2481 /* ASYNC_HP pages first. At present, when the lock the pages is
2482 * to be canceled, the pages covered by the lock will be sent out
2483 * with ASYNC_HP. We have to send out them as soon as possible. */
2484 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2485 if (oap->oap_async_flags & ASYNC_HP)
2486 cfs_list_move(&oap->oap_pending_item, &rpc_list);
2487 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2488 /* only do this for writeback pages. */
2489 cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2490 if (++page_count >= cli->cl_max_pages_per_rpc)
2493 cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2496 /* first we find the pages we're allowed to work with */
2497 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2499 ops = oap->oap_caller_ops;
2501 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2502 "magic 0x%x\n", oap, oap->oap_magic);
2505 /* pin object in memory, so that completion call-backs
2506 * can be safely called under client_obd_list lock. */
2507 clob = osc_oap2cl_page(oap)->cp_obj;
2508 cl_object_get(clob);
2511 if (page_count != 0 &&
2512 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2513 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2514 " oap %p, page %p, srvlock %u\n",
2515 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2519 /* If there is a gap at the start of this page, it can't merge
2520 * with any previous page, so we'll hand the network a
2521 * "fragmented" page array that it can't transfer in 1 RDMA */
2522 if (oap->oap_obj_off < starting_offset) {
2523 if (starting_page_off != 0)
2526 starting_page_off = oap->oap_page_off;
2527 starting_offset = oap->oap_obj_off + starting_page_off;
2528 } else if (oap->oap_page_off != 0)
2531 /* in llite being 'ready' equates to the page being locked
2532 * until completion unlocks it. commit_write submits a page
2533 * as not ready because its unlock will happen unconditionally
2534 * as the call returns. if we race with commit_write giving
2535 * us that page we don't want to create a hole in the page
2536 * stream, so we stop and leave the rpc to be fired by
2537 * another dirtier or kupdated interval (the not ready page
2538 * will still be on the dirty list). we could call in
2539 * at the end of ll_file_write to process the queue again. */
2540 if (!(oap->oap_async_flags & ASYNC_READY)) {
2541 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2544 CDEBUG(D_INODE, "oap %p page %p returned %d "
2545 "instead of ready\n", oap,
2549 /* llite is telling us that the page is still
2550 * in commit_write and that we should try
2551 * and put it in an rpc again later. we
2552 * break out of the loop so we don't create
2553 * a hole in the sequence of pages in the rpc
2558 /* the io isn't needed.. tell the checks
2559 * below to complete the rpc with EINTR */
2560 cfs_spin_lock(&oap->oap_lock);
2561 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2562 cfs_spin_unlock(&oap->oap_lock);
2563 oap->oap_count = -EINTR;
2566 cfs_spin_lock(&oap->oap_lock);
2567 oap->oap_async_flags |= ASYNC_READY;
2568 cfs_spin_unlock(&oap->oap_lock);
2571 LASSERTF(0, "oap %p page %p returned %d "
2572 "from make_ready\n", oap,
2580 /* take the page out of our book-keeping */
2581 cfs_list_del_init(&oap->oap_pending_item);
2582 lop_update_pending(cli, lop, cmd, -1);
2583 cfs_list_del_init(&oap->oap_urgent_item);
2585 /* ask the caller for the size of the io as the rpc leaves. */
2586 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2588 ops->ap_refresh_count(env, oap->oap_caller_data,
2590 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2592 if (oap->oap_count <= 0) {
2593 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2595 osc_ap_completion(env, cli, NULL,
2596 oap, 0, oap->oap_count);
2600 /* now put the page back in our accounting */
2601 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2602 if (page_count++ == 0)
2603 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2605 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2608 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2609 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2610 * have the same alignment as the initial writes that allocated
2611 * extents on the server. */
2612 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2614 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2617 if (page_count >= cli->cl_max_pages_per_rpc)
2620 /* If there is a gap at the end of this page, it can't merge
2621 * with any subsequent pages, so we'll hand the network a
2622 * "fragmented" page array that it can't transfer in 1 RDMA */
2623 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2627 loi_list_maint(cli, loi);
2629 client_obd_list_unlock(&cli->cl_loi_list_lock);
2632 cl_object_put(env, clob);
2634 if (page_count == 0) {
2635 client_obd_list_lock(&cli->cl_loi_list_lock);
2639 req = osc_build_req(env, cli, &rpc_list, page_count,
2640 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2642 LASSERT(cfs_list_empty(&rpc_list));
2643 loi_list_maint(cli, loi);
2644 RETURN(PTR_ERR(req));
2647 aa = ptlrpc_req_async_args(req);
2649 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2650 if (cmd == OBD_BRW_READ) {
2651 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2652 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2653 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2654 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2656 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2657 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2658 cli->cl_w_in_flight);
2659 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2660 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2663 client_obd_list_lock(&cli->cl_loi_list_lock);
2665 if (cmd == OBD_BRW_READ)
2666 cli->cl_r_in_flight++;
2668 cli->cl_w_in_flight++;
2670 /* queued sync pages can be torn down while the pages
2671 * were between the pending list and the rpc */
2673 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2674 /* only one oap gets a request reference */
2677 if (oap->oap_interrupted && !req->rq_intr) {
2678 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2680 ptlrpc_mark_interrupted(req);
2684 tmp->oap_request = ptlrpc_request_addref(req);
2686 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2687 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2689 req->rq_interpret_reply = brw_interpret;
2691 /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2692 * CPU/NUMA node the majority of pages were allocated on, and try
2693 * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2694 * to reduce cross-CPU memory traffic.
2696 * But on the other hand, we expect that multiple ptlrpcd threads
2697 * and the initial write sponsor can run in parallel, especially
2698 * when data checksum is enabled, which is CPU-bound operation and
2699 * single ptlrpcd thread cannot process in time. So more ptlrpcd
2700 * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2702 ptlrpcd_add_req(req, pol, -1);
2706 #define LOI_DEBUG(LOI, STR, args...) \
2707 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2708 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2709 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2710 (LOI)->loi_write_lop.lop_num_pending, \
2711 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2712 (LOI)->loi_read_lop.lop_num_pending, \
2713 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2716 /* This is called by osc_check_rpcs() to find which objects have pages that
2717 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2718 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2722 /* First return objects that have blocked locks so that they
2723 * will be flushed quickly and other clients can get the lock,
2724 * then objects which have pages ready to be stuffed into RPCs */
2725 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2726 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2727 struct lov_oinfo, loi_hp_ready_item));
2728 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2729 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2730 struct lov_oinfo, loi_ready_item));
2732 /* then if we have cache waiters, return all objects with queued
2733 * writes. This is especially important when many small files
2734 * have filled up the cache and not been fired into rpcs because
2735 * they don't pass the nr_pending/object threshhold */
2736 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2737 !cfs_list_empty(&cli->cl_loi_write_list))
2738 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2739 struct lov_oinfo, loi_write_item));
2741 /* then return all queued objects when we have an invalid import
2742 * so that they get flushed */
2743 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2744 if (!cfs_list_empty(&cli->cl_loi_write_list))
2745 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2748 if (!cfs_list_empty(&cli->cl_loi_read_list))
2749 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2750 struct lov_oinfo, loi_read_item));
2755 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2757 struct osc_async_page *oap;
2760 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2761 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2762 struct osc_async_page, oap_urgent_item);
2763 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2766 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2767 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2768 struct osc_async_page, oap_urgent_item);
2769 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2772 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2775 /* called with the loi list lock held */
2776 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2778 struct lov_oinfo *loi;
2779 int rc = 0, race_counter = 0;
2783 pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2785 while ((loi = osc_next_loi(cli)) != NULL) {
2786 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2788 if (osc_max_rpc_in_flight(cli, loi))
2791 /* attempt some read/write balancing by alternating between
2792 * reads and writes in an object. The makes_rpc checks here
2793 * would be redundant if we were getting read/write work items
2794 * instead of objects. we don't want send_oap_rpc to drain a
2795 * partial read pending queue when we're given this object to
2796 * do io on writes while there are cache waiters */
2797 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2798 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2799 &loi->loi_write_lop, pol);
2801 CERROR("Write request failed with %d\n", rc);
2803 /* osc_send_oap_rpc failed, mostly because of
2806 * It can't break here, because if:
2807 * - a page was submitted by osc_io_submit, so
2809 * - no request in flight
2810 * - no subsequent request
2811 * The system will be in live-lock state,
2812 * because there is no chance to call
2813 * osc_io_unplug() and osc_check_rpcs() any
2814 * more. pdflush can't help in this case,
2815 * because it might be blocked at grabbing
2816 * the page lock as we mentioned.
2818 * Anyway, continue to drain pages. */
2827 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2828 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2829 &loi->loi_read_lop, pol);
2831 CERROR("Read request failed with %d\n", rc);
2839 /* attempt some inter-object balancing by issuing rpcs
2840 * for each object in turn */
2841 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2842 cfs_list_del_init(&loi->loi_hp_ready_item);
2843 if (!cfs_list_empty(&loi->loi_ready_item))
2844 cfs_list_del_init(&loi->loi_ready_item);
2845 if (!cfs_list_empty(&loi->loi_write_item))
2846 cfs_list_del_init(&loi->loi_write_item);
2847 if (!cfs_list_empty(&loi->loi_read_item))
2848 cfs_list_del_init(&loi->loi_read_item);
2850 loi_list_maint(cli, loi);
2852 /* send_oap_rpc fails with 0 when make_ready tells it to
2853 * back off. llite's make_ready does this when it tries
2854 * to lock a page queued for write that is already locked.
2855 * we want to try sending rpcs from many objects, but we
2856 * don't want to spin failing with 0. */
2857 if (race_counter == 10)
2862 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2864 osc_check_rpcs0(env, cli, 0);
2868 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2871 int osc_enter_cache_try(const struct lu_env *env,
2872 struct client_obd *cli, struct lov_oinfo *loi,
2873 struct osc_async_page *oap, int transient)
2877 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2879 osc_consume_write_grant(cli, &oap->oap_brw_page);
2881 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2882 cfs_atomic_inc(&obd_dirty_transit_pages);
2883 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2889 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2890 * grant or cache space. */
2891 static int osc_enter_cache(const struct lu_env *env,
2892 struct client_obd *cli, struct lov_oinfo *loi,
2893 struct osc_async_page *oap)
2895 struct osc_cache_waiter ocw;
2896 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2900 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2901 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2902 cli->cl_dirty_max, obd_max_dirty_pages,
2903 cli->cl_lost_grant, cli->cl_avail_grant);
2905 /* force the caller to try sync io. this can jump the list
2906 * of queued writes and create a discontiguous rpc stream */
2907 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2908 cli->cl_dirty_max < CFS_PAGE_SIZE ||
2909 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2912 /* Hopefully normal case - cache space and write credits available */
2913 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2914 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2915 osc_enter_cache_try(env, cli, loi, oap, 0))
2918 /* We can get here for two reasons: too many dirty pages in cache, or
2919 * run out of grants. In both cases we should write dirty pages out.
2920 * Adding a cache waiter will trigger urgent write-out no matter what
2922 * The exiting condition is no avail grants and no dirty pages caching,
2923 * that really means there is no space on the OST. */
2924 cfs_waitq_init(&ocw.ocw_waitq);
2926 while (cli->cl_dirty > 0) {
2927 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2930 loi_list_maint(cli, loi);
2931 osc_check_rpcs(env, cli);
2932 client_obd_list_unlock(&cli->cl_loi_list_lock);
2934 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
2935 cli->cl_import->imp_obd->obd_name, &ocw, oap);
2937 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
2939 client_obd_list_lock(&cli->cl_loi_list_lock);
2940 cfs_list_del_init(&ocw.ocw_entry);
2953 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2954 struct lov_oinfo *loi, cfs_page_t *page,
2955 obd_off offset, const struct obd_async_page_ops *ops,
2956 void *data, void **res, int nocache,
2957 struct lustre_handle *lockh)
2959 struct osc_async_page *oap;
2964 return cfs_size_round(sizeof(*oap));
2967 oap->oap_magic = OAP_MAGIC;
2968 oap->oap_cli = &exp->exp_obd->u.cli;
2971 oap->oap_caller_ops = ops;
2972 oap->oap_caller_data = data;
2974 oap->oap_page = page;
2975 oap->oap_obj_off = offset;
2976 if (!client_is_remote(exp) &&
2977 cfs_capable(CFS_CAP_SYS_RESOURCE))
2978 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2980 LASSERT(!(offset & ~CFS_PAGE_MASK));
2982 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2983 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2984 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2985 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2987 cfs_spin_lock_init(&oap->oap_lock);
2988 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2992 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2993 struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2994 struct osc_async_page *oap, int cmd, int off,
2995 int count, obd_flag brw_flags, enum async_flags async_flags)
2997 struct client_obd *cli = &exp->exp_obd->u.cli;
3001 if (oap->oap_magic != OAP_MAGIC)
3004 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3007 if (!cfs_list_empty(&oap->oap_pending_item) ||
3008 !cfs_list_empty(&oap->oap_urgent_item) ||
3009 !cfs_list_empty(&oap->oap_rpc_item))
3012 /* check if the file's owner/group is over quota */
3013 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3014 struct cl_object *obj;
3015 struct cl_attr attr; /* XXX put attr into thread info */
3016 unsigned int qid[MAXQUOTAS];
3018 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3020 cl_object_attr_lock(obj);
3021 rc = cl_object_attr_get(env, obj, &attr);
3022 cl_object_attr_unlock(obj);
3024 qid[USRQUOTA] = attr.cat_uid;
3025 qid[GRPQUOTA] = attr.cat_gid;
3027 osc_quota_chkdq(cli, qid) == NO_QUOTA)
3034 loi = lsm->lsm_oinfo[0];
3036 client_obd_list_lock(&cli->cl_loi_list_lock);
3038 LASSERT(off + count <= CFS_PAGE_SIZE);
3040 oap->oap_page_off = off;
3041 oap->oap_count = count;
3042 oap->oap_brw_flags = brw_flags;
3043 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3044 if (cfs_memory_pressure_get())
3045 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3046 cfs_spin_lock(&oap->oap_lock);
3047 oap->oap_async_flags = async_flags;
3048 cfs_spin_unlock(&oap->oap_lock);
3050 if (cmd & OBD_BRW_WRITE) {
3051 rc = osc_enter_cache(env, cli, loi, oap);
3053 client_obd_list_unlock(&cli->cl_loi_list_lock);
3058 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3061 osc_oap_to_pending(oap);
3062 loi_list_maint(cli, loi);
3063 if (!osc_max_rpc_in_flight(cli, loi) &&
3064 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3065 LASSERT(cli->cl_writeback_work != NULL);
3066 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3068 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3071 client_obd_list_unlock(&cli->cl_loi_list_lock);
3076 /* aka (~was & now & flag), but this is more clear :) */
3077 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3079 int osc_set_async_flags_base(struct client_obd *cli,
3080 struct lov_oinfo *loi, struct osc_async_page *oap,
3081 obd_flag async_flags)
3083 struct loi_oap_pages *lop;
3087 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3089 if (oap->oap_cmd & OBD_BRW_WRITE) {
3090 lop = &loi->loi_write_lop;
3092 lop = &loi->loi_read_lop;
3095 if ((oap->oap_async_flags & async_flags) == async_flags)
3098 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3099 flags |= ASYNC_READY;
3101 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3102 cfs_list_empty(&oap->oap_rpc_item)) {
3103 if (oap->oap_async_flags & ASYNC_HP)
3104 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3106 cfs_list_add_tail(&oap->oap_urgent_item,
3108 flags |= ASYNC_URGENT;
3109 loi_list_maint(cli, loi);
3111 cfs_spin_lock(&oap->oap_lock);
3112 oap->oap_async_flags |= flags;
3113 cfs_spin_unlock(&oap->oap_lock);
3115 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3116 oap->oap_async_flags);
3120 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3121 struct lov_oinfo *loi, struct osc_async_page *oap)
3123 struct client_obd *cli = &exp->exp_obd->u.cli;
3124 struct loi_oap_pages *lop;
3128 if (oap->oap_magic != OAP_MAGIC)
3132 loi = lsm->lsm_oinfo[0];
3134 if (oap->oap_cmd & OBD_BRW_WRITE) {
3135 lop = &loi->loi_write_lop;
3137 lop = &loi->loi_read_lop;
3140 client_obd_list_lock(&cli->cl_loi_list_lock);
3142 if (!cfs_list_empty(&oap->oap_rpc_item))
3143 GOTO(out, rc = -EBUSY);
3145 osc_exit_cache(cli, oap, 0);
3146 osc_wake_cache_waiters(cli);
3148 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3149 cfs_list_del_init(&oap->oap_urgent_item);
3150 cfs_spin_lock(&oap->oap_lock);
3151 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3152 cfs_spin_unlock(&oap->oap_lock);
3154 if (!cfs_list_empty(&oap->oap_pending_item)) {
3155 cfs_list_del_init(&oap->oap_pending_item);
3156 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3158 loi_list_maint(cli, loi);
3159 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3161 client_obd_list_unlock(&cli->cl_loi_list_lock);
3165 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3166 struct ldlm_enqueue_info *einfo)
3168 void *data = einfo->ei_cbdata;
3171 LASSERT(lock != NULL);
3172 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3173 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3174 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3175 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3177 lock_res_and_lock(lock);
3178 cfs_spin_lock(&osc_ast_guard);
3180 if (lock->l_ast_data == NULL)
3181 lock->l_ast_data = data;
3182 if (lock->l_ast_data == data)
3185 cfs_spin_unlock(&osc_ast_guard);
3186 unlock_res_and_lock(lock);
3191 static int osc_set_data_with_check(struct lustre_handle *lockh,
3192 struct ldlm_enqueue_info *einfo)
3194 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3198 set = osc_set_lock_data_with_check(lock, einfo);
3199 LDLM_LOCK_PUT(lock);
3201 CERROR("lockh %p, data %p - client evicted?\n",
3202 lockh, einfo->ei_cbdata);
3206 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3207 ldlm_iterator_t replace, void *data)
3209 struct ldlm_res_id res_id;
3210 struct obd_device *obd = class_exp2obd(exp);
3212 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3213 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3217 /* find any ldlm lock of the inode in osc
3221 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3222 ldlm_iterator_t replace, void *data)
3224 struct ldlm_res_id res_id;
3225 struct obd_device *obd = class_exp2obd(exp);
3228 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3229 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3230 if (rc == LDLM_ITER_STOP)
3232 if (rc == LDLM_ITER_CONTINUE)
3237 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3238 obd_enqueue_update_f upcall, void *cookie,
3239 int *flags, int agl, int rc)
3241 int intent = *flags & LDLM_FL_HAS_INTENT;
3245 /* The request was created before ldlm_cli_enqueue call. */
3246 if (rc == ELDLM_LOCK_ABORTED) {
3247 struct ldlm_reply *rep;
3248 rep = req_capsule_server_get(&req->rq_pill,
3251 LASSERT(rep != NULL);
3252 if (rep->lock_policy_res1)
3253 rc = rep->lock_policy_res1;
3257 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3259 *flags |= LDLM_FL_LVB_READY;
3260 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3261 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3264 /* Call the update callback. */
3265 rc = (*upcall)(cookie, rc);
3269 static int osc_enqueue_interpret(const struct lu_env *env,
3270 struct ptlrpc_request *req,
3271 struct osc_enqueue_args *aa, int rc)
3273 struct ldlm_lock *lock;
3274 struct lustre_handle handle;
3276 struct ost_lvb *lvb;
3278 int *flags = aa->oa_flags;
3280 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3281 * might be freed anytime after lock upcall has been called. */
3282 lustre_handle_copy(&handle, aa->oa_lockh);
3283 mode = aa->oa_ei->ei_mode;
3285 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3287 lock = ldlm_handle2lock(&handle);
3289 /* Take an additional reference so that a blocking AST that
3290 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3291 * to arrive after an upcall has been executed by
3292 * osc_enqueue_fini(). */
3293 ldlm_lock_addref(&handle, mode);
3295 /* Let CP AST to grant the lock first. */
3296 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3298 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3303 lvb_len = sizeof(*aa->oa_lvb);
3306 /* Complete obtaining the lock procedure. */
3307 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3308 mode, flags, lvb, lvb_len, &handle, rc);
3309 /* Complete osc stuff. */
3310 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3311 flags, aa->oa_agl, rc);
3313 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3315 /* Release the lock for async request. */
3316 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3318 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3319 * not already released by
3320 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3322 ldlm_lock_decref(&handle, mode);
3324 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3325 aa->oa_lockh, req, aa);
3326 ldlm_lock_decref(&handle, mode);
3327 LDLM_LOCK_PUT(lock);
3331 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3332 struct lov_oinfo *loi, int flags,
3333 struct ost_lvb *lvb, __u32 mode, int rc)
3335 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3337 if (rc == ELDLM_OK) {
3340 LASSERT(lock != NULL);
3341 loi->loi_lvb = *lvb;
3342 tmp = loi->loi_lvb.lvb_size;
3343 /* Extend KMS up to the end of this lock and no further
3344 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3345 if (tmp > lock->l_policy_data.l_extent.end)
3346 tmp = lock->l_policy_data.l_extent.end + 1;
3347 if (tmp >= loi->loi_kms) {
3348 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3349 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3350 loi_kms_set(loi, tmp);
3352 LDLM_DEBUG(lock, "lock acquired, setting rss="
3353 LPU64"; leaving kms="LPU64", end="LPU64,
3354 loi->loi_lvb.lvb_size, loi->loi_kms,
3355 lock->l_policy_data.l_extent.end);
3357 ldlm_lock_allow_match(lock);
3358 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3359 LASSERT(lock != NULL);
3360 loi->loi_lvb = *lvb;
3361 ldlm_lock_allow_match(lock);
3362 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3363 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3369 ldlm_lock_fail_match(lock);
3371 LDLM_LOCK_PUT(lock);
3374 EXPORT_SYMBOL(osc_update_enqueue);
3376 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3378 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3379 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3380 * other synchronous requests, however keeping some locks and trying to obtain
3381 * others may take a considerable amount of time in a case of ost failure; and
3382 * when other sync requests do not get released lock from a client, the client
3383 * is excluded from the cluster -- such scenarious make the life difficult, so
3384 * release locks just after they are obtained. */
3385 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3386 int *flags, ldlm_policy_data_t *policy,
3387 struct ost_lvb *lvb, int kms_valid,
3388 obd_enqueue_update_f upcall, void *cookie,
3389 struct ldlm_enqueue_info *einfo,
3390 struct lustre_handle *lockh,
3391 struct ptlrpc_request_set *rqset, int async, int agl)
3393 struct obd_device *obd = exp->exp_obd;
3394 struct ptlrpc_request *req = NULL;
3395 int intent = *flags & LDLM_FL_HAS_INTENT;
3396 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3401 /* Filesystem lock extents are extended to page boundaries so that
3402 * dealing with the page cache is a little smoother. */
3403 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3404 policy->l_extent.end |= ~CFS_PAGE_MASK;
3407 * kms is not valid when either object is completely fresh (so that no
3408 * locks are cached), or object was evicted. In the latter case cached
3409 * lock cannot be used, because it would prime inode state with
3410 * potentially stale LVB.
3415 /* Next, search for already existing extent locks that will cover us */
3416 /* If we're trying to read, we also search for an existing PW lock. The
3417 * VFS and page cache already protect us locally, so lots of readers/
3418 * writers can share a single PW lock.
3420 * There are problems with conversion deadlocks, so instead of
3421 * converting a read lock to a write lock, we'll just enqueue a new
3424 * At some point we should cancel the read lock instead of making them
3425 * send us a blocking callback, but there are problems with canceling
3426 * locks out from other users right now, too. */
3427 mode = einfo->ei_mode;
3428 if (einfo->ei_mode == LCK_PR)
3430 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3431 einfo->ei_type, policy, mode, lockh, 0);
3433 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3435 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3436 /* For AGL, if enqueue RPC is sent but the lock is not
3437 * granted, then skip to process this strpe.
3438 * Return -ECANCELED to tell the caller. */
3439 ldlm_lock_decref(lockh, mode);
3440 LDLM_LOCK_PUT(matched);
3442 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3443 *flags |= LDLM_FL_LVB_READY;
3444 /* addref the lock only if not async requests and PW
3445 * lock is matched whereas we asked for PR. */
3446 if (!rqset && einfo->ei_mode != mode)
3447 ldlm_lock_addref(lockh, LCK_PR);
3449 /* I would like to be able to ASSERT here that
3450 * rss <= kms, but I can't, for reasons which
3451 * are explained in lov_enqueue() */
3454 /* We already have a lock, and it's referenced */
3455 (*upcall)(cookie, ELDLM_OK);
3457 if (einfo->ei_mode != mode)
3458 ldlm_lock_decref(lockh, LCK_PW);
3460 /* For async requests, decref the lock. */
3461 ldlm_lock_decref(lockh, einfo->ei_mode);
3462 LDLM_LOCK_PUT(matched);
3465 ldlm_lock_decref(lockh, mode);
3466 LDLM_LOCK_PUT(matched);
3472 CFS_LIST_HEAD(cancels);
3473 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3474 &RQF_LDLM_ENQUEUE_LVB);
3478 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3480 ptlrpc_request_free(req);
3484 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3486 ptlrpc_request_set_replen(req);
3489 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3490 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3492 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3493 sizeof(*lvb), lockh, async);
3496 struct osc_enqueue_args *aa;
3497 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3498 aa = ptlrpc_req_async_args(req);
3501 aa->oa_flags = flags;
3502 aa->oa_upcall = upcall;
3503 aa->oa_cookie = cookie;
3505 aa->oa_lockh = lockh;
3508 req->rq_interpret_reply =
3509 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3510 if (rqset == PTLRPCD_SET)
3511 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3513 ptlrpc_set_add_req(rqset, req);
3514 } else if (intent) {
3515 ptlrpc_req_finished(req);
3520 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3522 ptlrpc_req_finished(req);
3527 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3528 struct ldlm_enqueue_info *einfo,
3529 struct ptlrpc_request_set *rqset)
3531 struct ldlm_res_id res_id;
3535 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3536 oinfo->oi_md->lsm_object_seq, &res_id);
3538 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3539 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3540 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3541 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3542 rqset, rqset != NULL, 0);
3546 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3547 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3548 int *flags, void *data, struct lustre_handle *lockh,
3551 struct obd_device *obd = exp->exp_obd;
3552 int lflags = *flags;
3556 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3559 /* Filesystem lock extents are extended to page boundaries so that
3560 * dealing with the page cache is a little smoother */
3561 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3562 policy->l_extent.end |= ~CFS_PAGE_MASK;
3564 /* Next, search for already existing extent locks that will cover us */
3565 /* If we're trying to read, we also search for an existing PW lock. The
3566 * VFS and page cache already protect us locally, so lots of readers/
3567 * writers can share a single PW lock. */
3571 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3572 res_id, type, policy, rc, lockh, unref);
3575 if (!osc_set_data_with_check(lockh, data)) {
3576 if (!(lflags & LDLM_FL_TEST_LOCK))
3577 ldlm_lock_decref(lockh, rc);
3581 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3582 ldlm_lock_addref(lockh, LCK_PR);
3583 ldlm_lock_decref(lockh, LCK_PW);
3590 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3594 if (unlikely(mode == LCK_GROUP))
3595 ldlm_lock_decref_and_cancel(lockh, mode);
3597 ldlm_lock_decref(lockh, mode);
3602 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3603 __u32 mode, struct lustre_handle *lockh)
3606 RETURN(osc_cancel_base(lockh, mode));
3609 static int osc_cancel_unused(struct obd_export *exp,
3610 struct lov_stripe_md *lsm,
3611 ldlm_cancel_flags_t flags,
3614 struct obd_device *obd = class_exp2obd(exp);
3615 struct ldlm_res_id res_id, *resp = NULL;
3618 resp = osc_build_res_name(lsm->lsm_object_id,
3619 lsm->lsm_object_seq, &res_id);
3622 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3625 static int osc_statfs_interpret(const struct lu_env *env,
3626 struct ptlrpc_request *req,
3627 struct osc_async_args *aa, int rc)
3629 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3630 struct obd_statfs *msfs;
3635 /* The request has in fact never been sent
3636 * due to issues at a higher level (LOV).
3637 * Exit immediately since the caller is
3638 * aware of the problem and takes care
3639 * of the clean up */
3642 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3643 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3649 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3651 GOTO(out, rc = -EPROTO);
3654 /* Reinitialize the RDONLY and DEGRADED flags at the client
3655 * on each statfs, so they don't stay set permanently. */
3656 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3658 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3659 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3660 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3661 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3663 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3664 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3665 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3666 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3668 /* Add a bit of hysteresis so this flag isn't continually flapping,
3669 * and ensure that new files don't get extremely fragmented due to
3670 * only a small amount of available space in the filesystem.
3671 * We want to set the NOSPC flag when there is less than ~0.1% free
3672 * and clear it when there is at least ~0.2% free space, so:
3673 * avail < ~0.1% max max = avail + used
3674 * 1025 * avail < avail + used used = blocks - free
3675 * 1024 * avail < used
3676 * 1024 * avail < blocks - free
3677 * avail < ((blocks - free) >> 10)
3679 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3680 * lose that amount of space so in those cases we report no space left
3681 * if their is less than 1 GB left. */
3682 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3683 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3684 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3685 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3686 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3687 (msfs->os_ffree > 64) &&
3688 (msfs->os_bavail > (used << 1)))) {
3689 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3690 OSCC_FLAG_NOSPC_BLK);
3693 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3694 (msfs->os_bavail < used)))
3695 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3697 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3699 *aa->aa_oi->oi_osfs = *msfs;
3701 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3705 static int osc_statfs_async(struct obd_export *exp,
3706 struct obd_info *oinfo, __u64 max_age,
3707 struct ptlrpc_request_set *rqset)
3709 struct obd_device *obd = class_exp2obd(exp);
3710 struct ptlrpc_request *req;
3711 struct osc_async_args *aa;
3715 /* We could possibly pass max_age in the request (as an absolute
3716 * timestamp or a "seconds.usec ago") so the target can avoid doing
3717 * extra calls into the filesystem if that isn't necessary (e.g.
3718 * during mount that would help a bit). Having relative timestamps
3719 * is not so great if request processing is slow, while absolute
3720 * timestamps are not ideal because they need time synchronization. */
3721 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3725 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3727 ptlrpc_request_free(req);
3730 ptlrpc_request_set_replen(req);
3731 req->rq_request_portal = OST_CREATE_PORTAL;
3732 ptlrpc_at_set_req_timeout(req);
3734 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3735 /* procfs requests not want stat in wait for avoid deadlock */
3736 req->rq_no_resend = 1;
3737 req->rq_no_delay = 1;
3740 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3741 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3742 aa = ptlrpc_req_async_args(req);
3745 ptlrpc_set_add_req(rqset, req);
3749 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3750 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
3752 struct obd_device *obd = class_exp2obd(exp);
3753 struct obd_statfs *msfs;
3754 struct ptlrpc_request *req;
3755 struct obd_import *imp = NULL;
3759 /*Since the request might also come from lprocfs, so we need
3760 *sync this with client_disconnect_export Bug15684*/
3761 cfs_down_read(&obd->u.cli.cl_sem);
3762 if (obd->u.cli.cl_import)
3763 imp = class_import_get(obd->u.cli.cl_import);
3764 cfs_up_read(&obd->u.cli.cl_sem);
3768 /* We could possibly pass max_age in the request (as an absolute
3769 * timestamp or a "seconds.usec ago") so the target can avoid doing
3770 * extra calls into the filesystem if that isn't necessary (e.g.
3771 * during mount that would help a bit). Having relative timestamps
3772 * is not so great if request processing is slow, while absolute
3773 * timestamps are not ideal because they need time synchronization. */
3774 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3776 class_import_put(imp);
3781 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3783 ptlrpc_request_free(req);
3786 ptlrpc_request_set_replen(req);
3787 req->rq_request_portal = OST_CREATE_PORTAL;
3788 ptlrpc_at_set_req_timeout(req);
3790 if (flags & OBD_STATFS_NODELAY) {
3791 /* procfs requests not want stat in wait for avoid deadlock */
3792 req->rq_no_resend = 1;
3793 req->rq_no_delay = 1;
3796 rc = ptlrpc_queue_wait(req);
3800 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3802 GOTO(out, rc = -EPROTO);
3809 ptlrpc_req_finished(req);
3813 /* Retrieve object striping information.
3815 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3816 * the maximum number of OST indices which will fit in the user buffer.
3817 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3819 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3821 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3822 struct lov_user_md_v3 lum, *lumk;
3823 struct lov_user_ost_data_v1 *lmm_objects;
3824 int rc = 0, lum_size;
3830 /* we only need the header part from user space to get lmm_magic and
3831 * lmm_stripe_count, (the header part is common to v1 and v3) */
3832 lum_size = sizeof(struct lov_user_md_v1);
3833 if (cfs_copy_from_user(&lum, lump, lum_size))
3836 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3837 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3840 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3841 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3842 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3843 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3845 /* we can use lov_mds_md_size() to compute lum_size
3846 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3847 if (lum.lmm_stripe_count > 0) {
3848 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3849 OBD_ALLOC(lumk, lum_size);
3853 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3854 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3856 lmm_objects = &(lumk->lmm_objects[0]);
3857 lmm_objects->l_object_id = lsm->lsm_object_id;
3859 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3863 lumk->lmm_object_id = lsm->lsm_object_id;
3864 lumk->lmm_object_seq = lsm->lsm_object_seq;
3865 lumk->lmm_stripe_count = 1;
3867 if (cfs_copy_to_user(lump, lumk, lum_size))
3871 OBD_FREE(lumk, lum_size);
3877 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3878 void *karg, void *uarg)
3880 struct obd_device *obd = exp->exp_obd;
3881 struct obd_ioctl_data *data = karg;
3885 if (!cfs_try_module_get(THIS_MODULE)) {
3886 CERROR("Can't get module. Is it alive?");
3890 case OBD_IOC_LOV_GET_CONFIG: {
3892 struct lov_desc *desc;
3893 struct obd_uuid uuid;
3897 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3898 GOTO(out, err = -EINVAL);
3900 data = (struct obd_ioctl_data *)buf;
3902 if (sizeof(*desc) > data->ioc_inllen1) {
3903 obd_ioctl_freedata(buf, len);
3904 GOTO(out, err = -EINVAL);
3907 if (data->ioc_inllen2 < sizeof(uuid)) {
3908 obd_ioctl_freedata(buf, len);
3909 GOTO(out, err = -EINVAL);
3912 desc = (struct lov_desc *)data->ioc_inlbuf1;
3913 desc->ld_tgt_count = 1;
3914 desc->ld_active_tgt_count = 1;
3915 desc->ld_default_stripe_count = 1;
3916 desc->ld_default_stripe_size = 0;
3917 desc->ld_default_stripe_offset = 0;
3918 desc->ld_pattern = 0;
3919 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3921 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3923 err = cfs_copy_to_user((void *)uarg, buf, len);
3926 obd_ioctl_freedata(buf, len);
3929 case LL_IOC_LOV_SETSTRIPE:
3930 err = obd_alloc_memmd(exp, karg);
3934 case LL_IOC_LOV_GETSTRIPE:
3935 err = osc_getstripe(karg, uarg);
3937 case OBD_IOC_CLIENT_RECOVER:
3938 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3939 data->ioc_inlbuf1, 0);
3943 case IOC_OSC_SET_ACTIVE:
3944 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3947 case OBD_IOC_POLL_QUOTACHECK:
3948 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3950 case OBD_IOC_PING_TARGET:
3951 err = ptlrpc_obd_ping(obd);
3954 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3955 cmd, cfs_curproc_comm());
3956 GOTO(out, err = -ENOTTY);
3959 cfs_module_put(THIS_MODULE);
3963 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3964 obd_count keylen, void *key, __u32 *vallen, void *val,
3965 struct lov_stripe_md *lsm)
3968 if (!vallen || !val)
3971 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3972 __u32 *stripe = val;
3973 *vallen = sizeof(*stripe);
3976 } else if (KEY_IS(KEY_LAST_ID)) {
3977 struct ptlrpc_request *req;
3982 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3983 &RQF_OST_GET_INFO_LAST_ID);
3987 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3988 RCL_CLIENT, keylen);
3989 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3991 ptlrpc_request_free(req);
3995 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3996 memcpy(tmp, key, keylen);
3998 req->rq_no_delay = req->rq_no_resend = 1;
3999 ptlrpc_request_set_replen(req);
4000 rc = ptlrpc_queue_wait(req);
4004 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
4006 GOTO(out, rc = -EPROTO);
4008 *((obd_id *)val) = *reply;
4010 ptlrpc_req_finished(req);
4012 } else if (KEY_IS(KEY_FIEMAP)) {
4013 struct ptlrpc_request *req;
4014 struct ll_user_fiemap *reply;
4018 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4019 &RQF_OST_GET_INFO_FIEMAP);
4023 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4024 RCL_CLIENT, keylen);
4025 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4026 RCL_CLIENT, *vallen);
4027 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4028 RCL_SERVER, *vallen);
4030 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4032 ptlrpc_request_free(req);
4036 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4037 memcpy(tmp, key, keylen);
4038 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4039 memcpy(tmp, val, *vallen);
4041 ptlrpc_request_set_replen(req);
4042 rc = ptlrpc_queue_wait(req);
4046 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4048 GOTO(out1, rc = -EPROTO);
4050 memcpy(val, reply, *vallen);
4052 ptlrpc_req_finished(req);
4060 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4062 struct llog_ctxt *ctxt;
4066 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4068 rc = llog_initiator_connect(ctxt);
4069 llog_ctxt_put(ctxt);
4071 /* XXX return an error? skip setting below flags? */
4074 cfs_spin_lock(&imp->imp_lock);
4075 imp->imp_server_timeout = 1;
4076 imp->imp_pingable = 1;
4077 cfs_spin_unlock(&imp->imp_lock);
4078 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4083 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4084 struct ptlrpc_request *req,
4091 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4094 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
4095 obd_count keylen, void *key, obd_count vallen,
4096 void *val, struct ptlrpc_request_set *set)
4098 struct ptlrpc_request *req;
4099 struct obd_device *obd = exp->exp_obd;
4100 struct obd_import *imp = class_exp2cliimp(exp);
4105 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4107 if (KEY_IS(KEY_NEXT_ID)) {
4109 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4111 if (vallen != sizeof(obd_id))
4116 if (vallen != sizeof(obd_id))
4119 /* avoid race between allocate new object and set next id
4120 * from ll_sync thread */
4121 cfs_spin_lock(&oscc->oscc_lock);
4122 new_val = *((obd_id*)val) + 1;
4123 if (new_val > oscc->oscc_next_id)
4124 oscc->oscc_next_id = new_val;
4125 cfs_spin_unlock(&oscc->oscc_lock);
4126 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4127 exp->exp_obd->obd_name,
4128 obd->u.cli.cl_oscc.oscc_next_id);
4133 if (KEY_IS(KEY_CHECKSUM)) {
4134 if (vallen != sizeof(int))
4136 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4140 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4141 sptlrpc_conf_client_adapt(obd);
4145 if (KEY_IS(KEY_FLUSH_CTX)) {
4146 sptlrpc_import_flush_my_ctx(imp);
4150 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4153 /* We pass all other commands directly to OST. Since nobody calls osc
4154 methods directly and everybody is supposed to go through LOV, we
4155 assume lov checked invalid values for us.
4156 The only recognised values so far are evict_by_nid and mds_conn.
4157 Even if something bad goes through, we'd get a -EINVAL from OST
4160 if (KEY_IS(KEY_GRANT_SHRINK))
4161 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4163 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4168 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4169 RCL_CLIENT, keylen);
4170 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4171 RCL_CLIENT, vallen);
4172 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4174 ptlrpc_request_free(req);
4178 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4179 memcpy(tmp, key, keylen);
4180 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4181 memcpy(tmp, val, vallen);
4183 if (KEY_IS(KEY_MDS_CONN)) {
4184 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4186 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4187 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4188 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4189 req->rq_no_delay = req->rq_no_resend = 1;
4190 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4191 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4192 struct osc_grant_args *aa;
4195 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4196 aa = ptlrpc_req_async_args(req);
4199 ptlrpc_req_finished(req);
4202 *oa = ((struct ost_body *)val)->oa;
4204 req->rq_interpret_reply = osc_shrink_grant_interpret;
4207 ptlrpc_request_set_replen(req);
4208 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4209 LASSERT(set != NULL);
4210 ptlrpc_set_add_req(set, req);
4211 ptlrpc_check_set(NULL, set);
4213 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4219 static struct llog_operations osc_size_repl_logops = {
4220 lop_cancel: llog_obd_repl_cancel
4223 static struct llog_operations osc_mds_ost_orig_logops;
4225 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4226 struct obd_device *tgt, struct llog_catid *catid)
4231 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4232 &catid->lci_logid, &osc_mds_ost_orig_logops);
4234 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4238 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4239 NULL, &osc_size_repl_logops);
4241 struct llog_ctxt *ctxt =
4242 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4245 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4250 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4251 obd->obd_name, tgt->obd_name, catid, rc);
4252 CERROR("logid "LPX64":0x%x\n",
4253 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4258 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4259 struct obd_device *disk_obd, int *index)
4261 struct llog_catid catid;
4262 static char name[32] = CATLIST;
4266 LASSERT(olg == &obd->obd_olg);
4268 cfs_mutex_lock(&olg->olg_cat_processing);
4269 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4271 CERROR("rc: %d\n", rc);
4275 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4276 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4277 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4279 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4281 CERROR("rc: %d\n", rc);
4285 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4287 CERROR("rc: %d\n", rc);
4292 cfs_mutex_unlock(&olg->olg_cat_processing);
4297 static int osc_llog_finish(struct obd_device *obd, int count)
4299 struct llog_ctxt *ctxt;
4300 int rc = 0, rc2 = 0;
4303 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4305 rc = llog_cleanup(ctxt);
4307 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4309 rc2 = llog_cleanup(ctxt);
4316 static int osc_reconnect(const struct lu_env *env,
4317 struct obd_export *exp, struct obd_device *obd,
4318 struct obd_uuid *cluuid,
4319 struct obd_connect_data *data,
4322 struct client_obd *cli = &obd->u.cli;
4324 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4327 client_obd_list_lock(&cli->cl_loi_list_lock);
4328 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4329 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4330 lost_grant = cli->cl_lost_grant;
4331 cli->cl_lost_grant = 0;
4332 client_obd_list_unlock(&cli->cl_loi_list_lock);
4334 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4335 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4336 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4337 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4338 " ocd_grant: %d\n", data->ocd_connect_flags,
4339 data->ocd_version, data->ocd_grant);
4345 static int osc_disconnect(struct obd_export *exp)
4347 struct obd_device *obd = class_exp2obd(exp);
4348 struct llog_ctxt *ctxt;
4351 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4353 if (obd->u.cli.cl_conn_count == 1) {
4354 /* Flush any remaining cancel messages out to the
4356 llog_sync(ctxt, exp);
4358 llog_ctxt_put(ctxt);
4360 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4364 rc = client_disconnect_export(exp);
4366 * Initially we put del_shrink_grant before disconnect_export, but it
4367 * causes the following problem if setup (connect) and cleanup
4368 * (disconnect) are tangled together.
4369 * connect p1 disconnect p2
4370 * ptlrpc_connect_import
4371 * ............... class_manual_cleanup
4374 * ptlrpc_connect_interrupt
4376 * add this client to shrink list
4378 * Bang! pinger trigger the shrink.
4379 * So the osc should be disconnected from the shrink list, after we
4380 * are sure the import has been destroyed. BUG18662
4382 if (obd->u.cli.cl_import == NULL)
4383 osc_del_shrink_grant(&obd->u.cli);
4387 static int osc_import_event(struct obd_device *obd,
4388 struct obd_import *imp,
4389 enum obd_import_event event)
4391 struct client_obd *cli;
4395 LASSERT(imp->imp_obd == obd);
4398 case IMP_EVENT_DISCON: {
4399 /* Only do this on the MDS OSC's */
4400 if (imp->imp_server_timeout) {
4401 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4403 cfs_spin_lock(&oscc->oscc_lock);
4404 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4405 cfs_spin_unlock(&oscc->oscc_lock);
4408 client_obd_list_lock(&cli->cl_loi_list_lock);
4409 cli->cl_avail_grant = 0;
4410 cli->cl_lost_grant = 0;
4411 client_obd_list_unlock(&cli->cl_loi_list_lock);
4414 case IMP_EVENT_INACTIVE: {
4415 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4418 case IMP_EVENT_INVALIDATE: {
4419 struct ldlm_namespace *ns = obd->obd_namespace;
4423 env = cl_env_get(&refcheck);
4427 client_obd_list_lock(&cli->cl_loi_list_lock);
4428 /* all pages go to failing rpcs due to the invalid
4430 osc_check_rpcs(env, cli);
4431 client_obd_list_unlock(&cli->cl_loi_list_lock);
4433 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4434 cl_env_put(env, &refcheck);
4439 case IMP_EVENT_ACTIVE: {
4440 /* Only do this on the MDS OSC's */
4441 if (imp->imp_server_timeout) {
4442 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4444 cfs_spin_lock(&oscc->oscc_lock);
4445 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4446 OSCC_FLAG_NOSPC_BLK);
4447 cfs_spin_unlock(&oscc->oscc_lock);
4449 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4452 case IMP_EVENT_OCD: {
4453 struct obd_connect_data *ocd = &imp->imp_connect_data;
4455 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4456 osc_init_grant(&obd->u.cli, ocd);
4459 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4460 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4462 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4465 case IMP_EVENT_DEACTIVATE: {
4466 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4469 case IMP_EVENT_ACTIVATE: {
4470 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4474 CERROR("Unknown import event %d\n", event);
4481 * Determine whether the lock can be canceled before replaying the lock
4482 * during recovery, see bug16774 for detailed information.
4484 * \retval zero the lock can't be canceled
4485 * \retval other ok to cancel
4487 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4489 check_res_locked(lock->l_resource);
4492 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4494 * XXX as a future improvement, we can also cancel unused write lock
4495 * if it doesn't have dirty data and active mmaps.
4497 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4498 (lock->l_granted_mode == LCK_PR ||
4499 lock->l_granted_mode == LCK_CR) &&
4500 (osc_dlm_lock_pageref(lock) == 0))
4506 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4508 struct client_obd *cli = &obd->u.cli;
4513 rc = ptlrpcd_addref();
4517 rc = client_obd_setup(obd, lcfg);
4520 handler = ptlrpcd_alloc_work(cli->cl_import,
4521 brw_queue_work, cli);
4522 if (!IS_ERR(handler))
4523 cli->cl_writeback_work = handler;
4525 rc = PTR_ERR(handler);
4529 struct lprocfs_static_vars lvars = { 0 };
4531 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4532 lprocfs_osc_init_vars(&lvars);
4533 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4534 lproc_osc_attach_seqstat(obd);
4535 sptlrpc_lprocfs_cliobd_attach(obd);
4536 ptlrpc_lprocfs_register_obd(obd);
4540 /* We need to allocate a few requests more, because
4541 brw_interpret tries to create new requests before freeing
4542 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4543 reserved, but I afraid that might be too much wasted RAM
4544 in fact, so 2 is just my guess and still should work. */
4545 cli->cl_import->imp_rq_pool =
4546 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4548 ptlrpc_add_rqs_to_pool);
4550 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4552 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4560 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4566 case OBD_CLEANUP_EARLY: {
4567 struct obd_import *imp;
4568 imp = obd->u.cli.cl_import;
4569 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4570 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4571 ptlrpc_deactivate_import(imp);
4572 cfs_spin_lock(&imp->imp_lock);
4573 imp->imp_pingable = 0;
4574 cfs_spin_unlock(&imp->imp_lock);
4577 case OBD_CLEANUP_EXPORTS: {
4578 struct client_obd *cli = &obd->u.cli;
4580 * for echo client, export may be on zombie list, wait for
4581 * zombie thread to cull it, because cli.cl_import will be
4582 * cleared in client_disconnect_export():
4583 * class_export_destroy() -> obd_cleanup() ->
4584 * echo_device_free() -> echo_client_cleanup() ->
4585 * obd_disconnect() -> osc_disconnect() ->
4586 * client_disconnect_export()
4588 obd_zombie_barrier();
4589 if (cli->cl_writeback_work) {
4590 ptlrpcd_destroy_work(cli->cl_writeback_work);
4591 cli->cl_writeback_work = NULL;
4593 obd_cleanup_client_import(obd);
4594 ptlrpc_lprocfs_unregister_obd(obd);
4595 lprocfs_obd_cleanup(obd);
4596 rc = obd_llog_finish(obd, 0);
4598 CERROR("failed to cleanup llogging subsystems\n");
4605 int osc_cleanup(struct obd_device *obd)
4611 /* free memory of osc quota cache */
4612 osc_quota_cleanup(obd);
4614 rc = client_obd_cleanup(obd);
4620 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4622 struct lprocfs_static_vars lvars = { 0 };
4625 lprocfs_osc_init_vars(&lvars);
4627 switch (lcfg->lcfg_command) {
4629 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4639 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4641 return osc_process_config_base(obd, buf);
4644 struct obd_ops osc_obd_ops = {
4645 .o_owner = THIS_MODULE,
4646 .o_setup = osc_setup,
4647 .o_precleanup = osc_precleanup,
4648 .o_cleanup = osc_cleanup,
4649 .o_add_conn = client_import_add_conn,
4650 .o_del_conn = client_import_del_conn,
4651 .o_connect = client_connect_import,
4652 .o_reconnect = osc_reconnect,
4653 .o_disconnect = osc_disconnect,
4654 .o_statfs = osc_statfs,
4655 .o_statfs_async = osc_statfs_async,
4656 .o_packmd = osc_packmd,
4657 .o_unpackmd = osc_unpackmd,
4658 .o_precreate = osc_precreate,
4659 .o_create = osc_create,
4660 .o_create_async = osc_create_async,
4661 .o_destroy = osc_destroy,
4662 .o_getattr = osc_getattr,
4663 .o_getattr_async = osc_getattr_async,
4664 .o_setattr = osc_setattr,
4665 .o_setattr_async = osc_setattr_async,
4667 .o_punch = osc_punch,
4669 .o_enqueue = osc_enqueue,
4670 .o_change_cbdata = osc_change_cbdata,
4671 .o_find_cbdata = osc_find_cbdata,
4672 .o_cancel = osc_cancel,
4673 .o_cancel_unused = osc_cancel_unused,
4674 .o_iocontrol = osc_iocontrol,
4675 .o_get_info = osc_get_info,
4676 .o_set_info_async = osc_set_info_async,
4677 .o_import_event = osc_import_event,
4678 .o_llog_init = osc_llog_init,
4679 .o_llog_finish = osc_llog_finish,
4680 .o_process_config = osc_process_config,
4681 .o_quotactl = osc_quotactl,
4682 .o_quotacheck = osc_quotacheck,
4683 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
4686 extern struct lu_kmem_descr osc_caches[];
4687 extern cfs_spinlock_t osc_ast_guard;
4688 extern cfs_lock_class_key_t osc_ast_guard_class;
4690 int __init osc_init(void)
4692 struct lprocfs_static_vars lvars = { 0 };
4696 /* print an address of _any_ initialized kernel symbol from this
4697 * module, to allow debugging with gdb that doesn't support data
4698 * symbols from modules.*/
4699 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4701 rc = lu_kmem_init(osc_caches);
4703 lprocfs_osc_init_vars(&lvars);
4706 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4707 LUSTRE_OSC_NAME, &osc_device_type);
4709 lu_kmem_fini(osc_caches);
4713 cfs_spin_lock_init(&osc_ast_guard);
4714 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4716 osc_mds_ost_orig_logops = llog_lvfs_ops;
4717 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4718 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4719 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4720 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4726 static void /*__exit*/ osc_exit(void)
4728 lu_device_type_fini(&osc_device_type);
4731 class_unregister_type(LUSTRE_OSC_NAME);
4732 lu_kmem_fini(osc_caches);
4735 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4736 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4737 MODULE_LICENSE("GPL");
4739 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);