1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011, 2012, Intel Corporation.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
41 # define EXPORT_SYMTAB
43 #define DEBUG_SUBSYSTEM S_OSC
45 #include <libcfs/libcfs.h>
48 # include <liblustre.h>
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
69 static quota_interface_t *quota_interface = NULL;
70 extern quota_interface_t osc_quota_interface;
72 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
73 static int brw_interpret(const struct lu_env *env,
74 struct ptlrpc_request *req, void *data, int rc);
75 int osc_cleanup(struct obd_device *obd);
77 /* Pack OSC object metadata for disk storage (LE byte order). */
78 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
79 struct lov_stripe_md *lsm)
84 lmm_size = sizeof(**lmmp);
89 OBD_FREE(*lmmp, lmm_size);
95 OBD_ALLOC(*lmmp, lmm_size);
101 LASSERT(lsm->lsm_object_id);
102 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
103 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
104 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
110 /* Unpack OSC object metadata from disk storage (LE byte order). */
111 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
112 struct lov_mds_md *lmm, int lmm_bytes)
115 struct obd_import *imp = class_exp2cliimp(exp);
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
159 LASSERT((*lsmp)->lsm_object_id);
160 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
164 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
165 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
167 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
172 static inline void osc_pack_capa(struct ptlrpc_request *req,
173 struct ost_body *body, void *capa)
175 struct obd_capa *oc = (struct obd_capa *)capa;
176 struct lustre_capa *c;
181 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
184 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
185 DEBUG_CAPA(D_SEC, c, "pack");
188 static inline void osc_pack_req_body(struct ptlrpc_request *req,
189 struct obd_info *oinfo)
191 struct ost_body *body;
193 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
196 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
197 osc_pack_capa(req, body, oinfo->oi_capa);
200 static inline void osc_set_capa_size(struct ptlrpc_request *req,
201 const struct req_msg_field *field,
205 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
207 /* it is already calculated as sizeof struct obd_capa */
211 static int osc_getattr_interpret(const struct lu_env *env,
212 struct ptlrpc_request *req,
213 struct osc_async_args *aa, int rc)
215 struct ost_body *body;
221 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
223 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
224 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
226 /* This should really be sent by the OST */
227 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
228 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
230 CDEBUG(D_INFO, "can't unpack ost_body\n");
232 aa->aa_oi->oi_oa->o_valid = 0;
235 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
239 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
240 struct ptlrpc_request_set *set)
242 struct ptlrpc_request *req;
243 struct osc_async_args *aa;
247 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
251 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
252 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
254 ptlrpc_request_free(req);
258 osc_pack_req_body(req, oinfo);
260 ptlrpc_request_set_replen(req);
261 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
263 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264 aa = ptlrpc_req_async_args(req);
267 ptlrpc_set_add_req(set, req);
271 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
273 struct ptlrpc_request *req;
274 struct ost_body *body;
278 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
282 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
283 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
285 ptlrpc_request_free(req);
289 osc_pack_req_body(req, oinfo);
291 ptlrpc_request_set_replen(req);
293 rc = ptlrpc_queue_wait(req);
297 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
299 GOTO(out, rc = -EPROTO);
301 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
302 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
304 /* This should really be sent by the OST */
305 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
306 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
310 ptlrpc_req_finished(req);
314 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
315 struct obd_trans_info *oti)
317 struct ptlrpc_request *req;
318 struct ost_body *body;
322 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
324 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
328 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
329 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
331 ptlrpc_request_free(req);
335 osc_pack_req_body(req, oinfo);
337 ptlrpc_request_set_replen(req);
339 rc = ptlrpc_queue_wait(req);
343 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
345 GOTO(out, rc = -EPROTO);
347 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
351 ptlrpc_req_finished(req);
355 static int osc_setattr_interpret(const struct lu_env *env,
356 struct ptlrpc_request *req,
357 struct osc_setattr_args *sa, int rc)
359 struct ost_body *body;
365 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
367 GOTO(out, rc = -EPROTO);
369 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
371 rc = sa->sa_upcall(sa->sa_cookie, rc);
375 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
376 struct obd_trans_info *oti,
377 obd_enqueue_update_f upcall, void *cookie,
378 struct ptlrpc_request_set *rqset)
380 struct ptlrpc_request *req;
381 struct osc_setattr_args *sa;
385 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
389 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
390 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
392 ptlrpc_request_free(req);
396 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
399 osc_pack_req_body(req, oinfo);
401 ptlrpc_request_set_replen(req);
403 /* do mds to ost setattr asynchronously */
405 /* Do not wait for response. */
406 ptlrpcd_add_req(req, PSCOPE_OTHER);
408 req->rq_interpret_reply =
409 (ptlrpc_interpterer_t)osc_setattr_interpret;
411 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
412 sa = ptlrpc_req_async_args(req);
413 sa->sa_oa = oinfo->oi_oa;
414 sa->sa_upcall = upcall;
415 sa->sa_cookie = cookie;
417 if (rqset == PTLRPCD_SET)
418 ptlrpcd_add_req(req, PSCOPE_OTHER);
420 ptlrpc_set_add_req(rqset, req);
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427 struct obd_trans_info *oti,
428 struct ptlrpc_request_set *rqset)
430 return osc_setattr_async_base(exp, oinfo, oti,
431 oinfo->oi_cb_up, oinfo, rqset);
434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435 struct lov_stripe_md **ea, struct obd_trans_info *oti)
437 struct ptlrpc_request *req;
438 struct ost_body *body;
439 struct lov_stripe_md *lsm;
448 rc = obd_alloc_memmd(exp, &lsm);
453 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
455 GOTO(out, rc = -ENOMEM);
457 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
459 ptlrpc_request_free(req);
463 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465 lustre_set_wire_obdo(&body->oa, oa);
467 ptlrpc_request_set_replen(req);
469 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
470 oa->o_flags == OBD_FL_DELORPHAN) {
472 "delorphan from OST integration");
473 /* Don't resend the delorphan req */
474 req->rq_no_resend = req->rq_no_delay = 1;
477 rc = ptlrpc_queue_wait(req);
481 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
483 GOTO(out_req, rc = -EPROTO);
485 lustre_get_wire_obdo(oa, &body->oa);
487 /* This should really be sent by the OST */
488 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
489 oa->o_valid |= OBD_MD_FLBLKSZ;
491 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
492 * have valid lsm_oinfo data structs, so don't go touching that.
493 * This needs to be fixed in a big way.
495 lsm->lsm_object_id = oa->o_id;
496 lsm->lsm_object_seq = oa->o_seq;
500 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
502 if (oa->o_valid & OBD_MD_FLCOOKIE) {
503 if (!oti->oti_logcookies)
504 oti_alloc_cookies(oti, 1);
505 *oti->oti_logcookies = oa->o_lcookie;
509 CDEBUG(D_HA, "transno: "LPD64"\n",
510 lustre_msg_get_transno(req->rq_repmsg));
512 ptlrpc_req_finished(req);
515 obd_free_memmd(exp, &lsm);
519 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
520 obd_enqueue_update_f upcall, void *cookie,
521 struct ptlrpc_request_set *rqset)
523 struct ptlrpc_request *req;
524 struct osc_setattr_args *sa;
525 struct ost_body *body;
529 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536 ptlrpc_request_free(req);
539 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540 ptlrpc_at_set_req_timeout(req);
542 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
545 osc_pack_capa(req, body, oinfo->oi_capa);
547 ptlrpc_request_set_replen(req);
550 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
552 sa = ptlrpc_req_async_args(req);
553 sa->sa_oa = oinfo->oi_oa;
554 sa->sa_upcall = upcall;
555 sa->sa_cookie = cookie;
556 if (rqset == PTLRPCD_SET)
557 ptlrpcd_add_req(req, PSCOPE_OTHER);
559 ptlrpc_set_add_req(rqset, req);
564 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
565 struct obd_trans_info *oti,
566 struct ptlrpc_request_set *rqset)
568 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
569 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
570 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
571 return osc_punch_base(exp, oinfo,
572 oinfo->oi_cb_up, oinfo, rqset);
575 static int osc_sync_interpret(const struct lu_env *env,
576 struct ptlrpc_request *req,
579 struct osc_async_args *aa = arg;
580 struct ost_body *body;
586 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
588 CERROR ("can't unpack ost_body\n");
589 GOTO(out, rc = -EPROTO);
592 *aa->aa_oi->oi_oa = body->oa;
594 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
598 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
599 obd_size start, obd_size end,
600 struct ptlrpc_request_set *set)
602 struct ptlrpc_request *req;
603 struct ost_body *body;
604 struct osc_async_args *aa;
609 CDEBUG(D_INFO, "oa NULL\n");
613 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
617 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
618 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
620 ptlrpc_request_free(req);
624 /* overload the size and blocks fields in the oa with start/end */
625 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
627 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
628 body->oa.o_size = start;
629 body->oa.o_blocks = end;
630 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
631 osc_pack_capa(req, body, oinfo->oi_capa);
633 ptlrpc_request_set_replen(req);
634 req->rq_interpret_reply = osc_sync_interpret;
636 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
637 aa = ptlrpc_req_async_args(req);
640 ptlrpc_set_add_req(set, req);
644 /* Find and cancel locally locks matched by @mode in the resource found by
645 * @objid. Found locks are added into @cancel list. Returns the amount of
646 * locks added to @cancels list. */
647 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
649 ldlm_mode_t mode, int lock_flags)
651 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
652 struct ldlm_res_id res_id;
653 struct ldlm_resource *res;
657 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
658 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
662 LDLM_RESOURCE_ADDREF(res);
663 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
664 lock_flags, 0, NULL);
665 LDLM_RESOURCE_DELREF(res);
666 ldlm_resource_putref(res);
670 static int osc_destroy_interpret(const struct lu_env *env,
671 struct ptlrpc_request *req, void *data,
674 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
676 cfs_atomic_dec(&cli->cl_destroy_in_flight);
677 cfs_waitq_signal(&cli->cl_destroy_waitq);
681 static int osc_can_send_destroy(struct client_obd *cli)
683 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
684 cli->cl_max_rpcs_in_flight) {
685 /* The destroy request can be sent */
688 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
689 cli->cl_max_rpcs_in_flight) {
691 * The counter has been modified between the two atomic
694 cfs_waitq_signal(&cli->cl_destroy_waitq);
699 /* Destroy requests can be async always on the client, and we don't even really
700 * care about the return code since the client cannot do anything at all about
702 * When the MDS is unlinking a filename, it saves the file objects into a
703 * recovery llog, and these object records are cancelled when the OST reports
704 * they were destroyed and sync'd to disk (i.e. transaction committed).
705 * If the client dies, or the OST is down when the object should be destroyed,
706 * the records are not cancelled, and when the OST reconnects to the MDS next,
707 * it will retrieve the llog unlink logs and then sends the log cancellation
708 * cookies to the MDS after committing destroy transactions. */
709 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
710 struct lov_stripe_md *ea, struct obd_trans_info *oti,
711 struct obd_export *md_export, void *capa)
713 struct client_obd *cli = &exp->exp_obd->u.cli;
714 struct ptlrpc_request *req;
715 struct ost_body *body;
716 CFS_LIST_HEAD(cancels);
721 CDEBUG(D_INFO, "oa NULL\n");
725 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
726 LDLM_FL_DISCARD_DATA);
728 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
730 ldlm_lock_list_put(&cancels, l_bl_ast, count);
734 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
735 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
738 ptlrpc_request_free(req);
742 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
743 ptlrpc_at_set_req_timeout(req);
745 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
746 oa->o_lcookie = *oti->oti_logcookies;
747 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
749 lustre_set_wire_obdo(&body->oa, oa);
751 osc_pack_capa(req, body, (struct obd_capa *)capa);
752 ptlrpc_request_set_replen(req);
754 /* don't throttle destroy RPCs for the MDT */
755 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
756 req->rq_interpret_reply = osc_destroy_interpret;
757 if (!osc_can_send_destroy(cli)) {
758 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
762 * Wait until the number of on-going destroy RPCs drops
763 * under max_rpc_in_flight
765 l_wait_event_exclusive(cli->cl_destroy_waitq,
766 osc_can_send_destroy(cli), &lwi);
770 /* Do not wait for response */
771 ptlrpcd_add_req(req, PSCOPE_OTHER);
775 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
778 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
780 LASSERT(!(oa->o_valid & bits));
783 client_obd_list_lock(&cli->cl_loi_list_lock);
784 oa->o_dirty = cli->cl_dirty;
785 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
786 CERROR("dirty %lu - %lu > dirty_max %lu\n",
787 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
789 } else if (cfs_atomic_read(&obd_dirty_pages) -
790 cfs_atomic_read(&obd_dirty_transit_pages) >
791 obd_max_dirty_pages + 1){
792 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
793 * not covered by a lock thus they may safely race and trip
794 * this CERROR() unless we add in a small fudge factor (+1). */
795 CERROR("dirty %d - %d > system dirty_max %d\n",
796 cfs_atomic_read(&obd_dirty_pages),
797 cfs_atomic_read(&obd_dirty_transit_pages),
798 obd_max_dirty_pages);
800 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
801 CERROR("dirty %lu - dirty_max %lu too big???\n",
802 cli->cl_dirty, cli->cl_dirty_max);
805 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
806 (cli->cl_max_rpcs_in_flight + 1);
807 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
809 oa->o_grant = cli->cl_avail_grant;
810 oa->o_dropped = cli->cl_lost_grant;
811 cli->cl_lost_grant = 0;
812 client_obd_list_unlock(&cli->cl_loi_list_lock);
813 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
814 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
818 static void osc_update_next_shrink(struct client_obd *cli)
820 cli->cl_next_shrink_grant =
821 cfs_time_shift(cli->cl_grant_shrink_interval);
822 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
823 cli->cl_next_shrink_grant);
826 /* caller must hold loi_list_lock */
827 static void osc_consume_write_grant(struct client_obd *cli,
828 struct brw_page *pga)
830 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
831 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
832 cfs_atomic_inc(&obd_dirty_pages);
833 cli->cl_dirty += CFS_PAGE_SIZE;
834 cli->cl_avail_grant -= CFS_PAGE_SIZE;
835 pga->flag |= OBD_BRW_FROM_GRANT;
836 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
837 CFS_PAGE_SIZE, pga, pga->pg);
838 LASSERT(cli->cl_avail_grant >= 0);
839 osc_update_next_shrink(cli);
842 /* the companion to osc_consume_write_grant, called when a brw has completed.
843 * must be called with the loi lock held. */
844 static void osc_release_write_grant(struct client_obd *cli,
845 struct brw_page *pga, int sent)
847 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
850 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
851 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
856 pga->flag &= ~OBD_BRW_FROM_GRANT;
857 cfs_atomic_dec(&obd_dirty_pages);
858 cli->cl_dirty -= CFS_PAGE_SIZE;
859 if (pga->flag & OBD_BRW_NOCACHE) {
860 pga->flag &= ~OBD_BRW_NOCACHE;
861 cfs_atomic_dec(&obd_dirty_transit_pages);
862 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
865 cli->cl_lost_grant += CFS_PAGE_SIZE;
866 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
867 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
868 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
869 /* For short writes we shouldn't count parts of pages that
870 * span a whole block on the OST side, or our accounting goes
871 * wrong. Should match the code in filter_grant_check. */
872 int offset = pga->off & ~CFS_PAGE_MASK;
873 int count = pga->count + (offset & (blocksize - 1));
874 int end = (offset + pga->count) & (blocksize - 1);
876 count += blocksize - end;
878 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
879 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
880 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
881 cli->cl_avail_grant, cli->cl_dirty);
887 static unsigned long rpcs_in_flight(struct client_obd *cli)
889 return cli->cl_r_in_flight + cli->cl_w_in_flight;
892 /* caller must hold loi_list_lock */
893 void osc_wake_cache_waiters(struct client_obd *cli)
896 struct osc_cache_waiter *ocw;
899 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
900 /* if we can't dirty more, we must wait until some is written */
901 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
902 (cfs_atomic_read(&obd_dirty_pages) + 1 >
903 obd_max_dirty_pages)) {
904 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
905 "osc max %ld, sys max %d\n", cli->cl_dirty,
906 cli->cl_dirty_max, obd_max_dirty_pages);
910 /* if still dirty cache but no grant wait for pending RPCs that
911 * may yet return us some grant before doing sync writes */
912 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
913 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
914 cli->cl_w_in_flight);
918 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
919 cfs_list_del_init(&ocw->ocw_entry);
920 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
921 /* no more RPCs in flight to return grant, do sync IO */
922 ocw->ocw_rc = -EDQUOT;
923 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
925 osc_consume_write_grant(cli,
926 &ocw->ocw_oap->oap_brw_page);
929 cfs_waitq_signal(&ocw->ocw_waitq);
935 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
937 client_obd_list_lock(&cli->cl_loi_list_lock);
938 cli->cl_avail_grant += grant;
939 client_obd_list_unlock(&cli->cl_loi_list_lock);
942 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
944 if (body->oa.o_valid & OBD_MD_FLGRANT) {
945 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
946 __osc_update_grant(cli, body->oa.o_grant);
950 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
951 void *key, obd_count vallen, void *val,
952 struct ptlrpc_request_set *set);
954 static int osc_shrink_grant_interpret(const struct lu_env *env,
955 struct ptlrpc_request *req,
958 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
959 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
960 struct ost_body *body;
963 __osc_update_grant(cli, oa->o_grant);
967 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
969 osc_update_grant(cli, body);
975 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
977 client_obd_list_lock(&cli->cl_loi_list_lock);
978 oa->o_grant = cli->cl_avail_grant / 4;
979 cli->cl_avail_grant -= oa->o_grant;
980 client_obd_list_unlock(&cli->cl_loi_list_lock);
981 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
982 oa->o_valid |= OBD_MD_FLFLAGS;
985 oa->o_flags |= OBD_FL_SHRINK_GRANT;
986 osc_update_next_shrink(cli);
989 /* Shrink the current grant, either from some large amount to enough for a
990 * full set of in-flight RPCs, or if we have already shrunk to that limit
991 * then to enough for a single RPC. This avoids keeping more grant than
992 * needed, and avoids shrinking the grant piecemeal. */
993 static int osc_shrink_grant(struct client_obd *cli)
995 long target = (cli->cl_max_rpcs_in_flight + 1) *
996 cli->cl_max_pages_per_rpc;
998 client_obd_list_lock(&cli->cl_loi_list_lock);
999 if (cli->cl_avail_grant <= target)
1000 target = cli->cl_max_pages_per_rpc;
1001 client_obd_list_unlock(&cli->cl_loi_list_lock);
1003 return osc_shrink_grant_to_target(cli, target);
1006 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1009 struct ost_body *body;
1012 client_obd_list_lock(&cli->cl_loi_list_lock);
1013 /* Don't shrink if we are already above or below the desired limit
1014 * We don't want to shrink below a single RPC, as that will negatively
1015 * impact block allocation and long-term performance. */
1016 if (target < cli->cl_max_pages_per_rpc)
1017 target = cli->cl_max_pages_per_rpc;
1019 if (target >= cli->cl_avail_grant) {
1020 client_obd_list_unlock(&cli->cl_loi_list_lock);
1023 client_obd_list_unlock(&cli->cl_loi_list_lock);
1025 OBD_ALLOC_PTR(body);
1029 osc_announce_cached(cli, &body->oa, 0);
1031 client_obd_list_lock(&cli->cl_loi_list_lock);
1032 body->oa.o_grant = cli->cl_avail_grant - target;
1033 cli->cl_avail_grant = target;
1034 client_obd_list_unlock(&cli->cl_loi_list_lock);
1035 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1036 body->oa.o_valid |= OBD_MD_FLFLAGS;
1037 body->oa.o_flags = 0;
1039 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1040 osc_update_next_shrink(cli);
1042 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1043 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1044 sizeof(*body), body, NULL);
1046 __osc_update_grant(cli, body->oa.o_grant);
1051 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1052 static int osc_should_shrink_grant(struct client_obd *client)
1054 cfs_time_t time = cfs_time_current();
1055 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1057 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1058 OBD_CONNECT_GRANT_SHRINK) == 0)
1061 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1062 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1063 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1066 osc_update_next_shrink(client);
1071 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1073 struct client_obd *client;
1075 cfs_list_for_each_entry(client, &item->ti_obd_list,
1076 cl_grant_shrink_list) {
1077 if (osc_should_shrink_grant(client))
1078 osc_shrink_grant(client);
1083 static int osc_add_shrink_grant(struct client_obd *client)
1087 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1089 osc_grant_shrink_grant_cb, NULL,
1090 &client->cl_grant_shrink_list);
1092 CERROR("add grant client %s error %d\n",
1093 client->cl_import->imp_obd->obd_name, rc);
1096 CDEBUG(D_CACHE, "add grant client %s \n",
1097 client->cl_import->imp_obd->obd_name);
1098 osc_update_next_shrink(client);
1102 static int osc_del_shrink_grant(struct client_obd *client)
1104 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1108 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1111 * ocd_grant is the total grant amount we're expect to hold: if we've
1112 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1113 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1115 * race is tolerable here: if we're evicted, but imp_state already
1116 * left EVICTED state, then cl_dirty must be 0 already.
1118 client_obd_list_lock(&cli->cl_loi_list_lock);
1119 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1120 cli->cl_avail_grant = ocd->ocd_grant;
1122 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1124 if (cli->cl_avail_grant < 0) {
1125 CWARN("%s: available grant < 0, the OSS is probably not running"
1126 " with patch from bug20278 (%ld) \n",
1127 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1128 /* workaround for 1.6 servers which do not have
1129 * the patch from bug20278 */
1130 cli->cl_avail_grant = ocd->ocd_grant;
1133 client_obd_list_unlock(&cli->cl_loi_list_lock);
1135 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1136 cli->cl_import->imp_obd->obd_name,
1137 cli->cl_avail_grant, cli->cl_lost_grant);
1139 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1140 cfs_list_empty(&cli->cl_grant_shrink_list))
1141 osc_add_shrink_grant(cli);
1144 /* We assume that the reason this OSC got a short read is because it read
1145 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1146 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1147 * this stripe never got written at or beyond this stripe offset yet. */
1148 static void handle_short_read(int nob_read, obd_count page_count,
1149 struct brw_page **pga)
1154 /* skip bytes read OK */
1155 while (nob_read > 0) {
1156 LASSERT (page_count > 0);
1158 if (pga[i]->count > nob_read) {
1159 /* EOF inside this page */
1160 ptr = cfs_kmap(pga[i]->pg) +
1161 (pga[i]->off & ~CFS_PAGE_MASK);
1162 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1163 cfs_kunmap(pga[i]->pg);
1169 nob_read -= pga[i]->count;
1174 /* zero remaining pages */
1175 while (page_count-- > 0) {
1176 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1177 memset(ptr, 0, pga[i]->count);
1178 cfs_kunmap(pga[i]->pg);
1183 static int check_write_rcs(struct ptlrpc_request *req,
1184 int requested_nob, int niocount,
1185 obd_count page_count, struct brw_page **pga)
1190 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1191 sizeof(*remote_rcs) *
1193 if (remote_rcs == NULL) {
1194 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1198 /* return error if any niobuf was in error */
1199 for (i = 0; i < niocount; i++) {
1200 if ((int)remote_rcs[i] < 0)
1201 return(remote_rcs[i]);
1203 if (remote_rcs[i] != 0) {
1204 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1205 i, remote_rcs[i], req);
1210 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1211 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1212 req->rq_bulk->bd_nob_transferred, requested_nob);
1219 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1221 if (p1->flag != p2->flag) {
1222 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1223 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1225 /* warn if we try to combine flags that we don't know to be
1226 * safe to combine */
1227 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1228 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1229 "report this at http://bugs.whamcloud.com/\n",
1230 p1->flag, p2->flag);
1235 return (p1->off + p1->count == p2->off);
1238 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1239 struct brw_page **pga, int opc,
1240 cksum_type_t cksum_type)
1245 LASSERT (pg_count > 0);
1246 cksum = init_checksum(cksum_type);
1247 while (nob > 0 && pg_count > 0) {
1248 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1249 int off = pga[i]->off & ~CFS_PAGE_MASK;
1250 int count = pga[i]->count > nob ? nob : pga[i]->count;
1252 /* corrupt the data before we compute the checksum, to
1253 * simulate an OST->client data error */
1254 if (i == 0 && opc == OST_READ &&
1255 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1256 memcpy(ptr + off, "bad1", min(4, nob));
1257 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1258 cfs_kunmap(pga[i]->pg);
1259 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1262 nob -= pga[i]->count;
1266 /* For sending we only compute the wrong checksum instead
1267 * of corrupting the data so it is still correct on a redo */
1268 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1274 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1275 struct lov_stripe_md *lsm, obd_count page_count,
1276 struct brw_page **pga,
1277 struct ptlrpc_request **reqp,
1278 struct obd_capa *ocapa, int reserve,
1281 struct ptlrpc_request *req;
1282 struct ptlrpc_bulk_desc *desc;
1283 struct ost_body *body;
1284 struct obd_ioobj *ioobj;
1285 struct niobuf_remote *niobuf;
1286 int niocount, i, requested_nob, opc, rc;
1287 struct osc_brw_async_args *aa;
1288 struct req_capsule *pill;
1289 struct brw_page *pg_prev;
1292 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1293 RETURN(-ENOMEM); /* Recoverable */
1294 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1295 RETURN(-EINVAL); /* Fatal */
1297 if ((cmd & OBD_BRW_WRITE) != 0) {
1299 req = ptlrpc_request_alloc_pool(cli->cl_import,
1300 cli->cl_import->imp_rq_pool,
1301 &RQF_OST_BRW_WRITE);
1304 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1309 for (niocount = i = 1; i < page_count; i++) {
1310 if (!can_merge_pages(pga[i - 1], pga[i]))
1314 pill = &req->rq_pill;
1315 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1317 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1318 niocount * sizeof(*niobuf));
1319 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1323 ptlrpc_request_free(req);
1326 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1327 ptlrpc_at_set_req_timeout(req);
1328 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1330 req->rq_no_retry_einprogress = 1;
1332 if (opc == OST_WRITE)
1333 desc = ptlrpc_prep_bulk_imp(req, page_count,
1334 BULK_GET_SOURCE, OST_BULK_PORTAL);
1336 desc = ptlrpc_prep_bulk_imp(req, page_count,
1337 BULK_PUT_SINK, OST_BULK_PORTAL);
1340 GOTO(out, rc = -ENOMEM);
1341 /* NB request now owns desc and will free it when it gets freed */
1343 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1344 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1345 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1346 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1348 lustre_set_wire_obdo(&body->oa, oa);
1350 obdo_to_ioobj(oa, ioobj);
1351 ioobj->ioo_bufcnt = niocount;
1352 osc_pack_capa(req, body, ocapa);
1353 LASSERT (page_count > 0);
1355 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1356 struct brw_page *pg = pga[i];
1357 int poff = pg->off & ~CFS_PAGE_MASK;
1359 LASSERT(pg->count > 0);
1360 /* make sure there is no gap in the middle of page array */
1361 LASSERTF(page_count == 1 ||
1362 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1363 ergo(i > 0 && i < page_count - 1,
1364 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1365 ergo(i == page_count - 1, poff == 0)),
1366 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1367 i, page_count, pg, pg->off, pg->count);
1369 LASSERTF(i == 0 || pg->off > pg_prev->off,
1370 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1371 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1373 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1374 pg_prev->pg, page_private(pg_prev->pg),
1375 pg_prev->pg->index, pg_prev->off);
1377 LASSERTF(i == 0 || pg->off > pg_prev->off,
1378 "i %d p_c %u\n", i, page_count);
1380 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1381 (pg->flag & OBD_BRW_SRVLOCK));
1383 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1384 requested_nob += pg->count;
1386 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1388 niobuf->len += pg->count;
1390 niobuf->offset = pg->off;
1391 niobuf->len = pg->count;
1392 niobuf->flags = pg->flag;
1397 LASSERTF((void *)(niobuf - niocount) ==
1398 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1399 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1400 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1402 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1404 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1405 body->oa.o_valid |= OBD_MD_FLFLAGS;
1406 body->oa.o_flags = 0;
1408 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1411 if (osc_should_shrink_grant(cli))
1412 osc_shrink_grant_local(cli, &body->oa);
1414 /* size[REQ_REC_OFF] still sizeof (*body) */
1415 if (opc == OST_WRITE) {
1416 if (unlikely(cli->cl_checksum) &&
1417 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1418 /* store cl_cksum_type in a local variable since
1419 * it can be changed via lprocfs */
1420 cksum_type_t cksum_type = cli->cl_cksum_type;
1422 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1423 oa->o_flags &= OBD_FL_LOCAL_MASK;
1424 body->oa.o_flags = 0;
1426 body->oa.o_flags |= cksum_type_pack(cksum_type);
1427 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1428 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1432 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1434 /* save this in 'oa', too, for later checking */
1435 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1436 oa->o_flags |= cksum_type_pack(cksum_type);
1438 /* clear out the checksum flag, in case this is a
1439 * resend but cl_checksum is no longer set. b=11238 */
1440 oa->o_valid &= ~OBD_MD_FLCKSUM;
1442 oa->o_cksum = body->oa.o_cksum;
1443 /* 1 RC per niobuf */
1444 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1445 sizeof(__u32) * niocount);
1447 if (unlikely(cli->cl_checksum) &&
1448 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1449 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1450 body->oa.o_flags = 0;
1451 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1452 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1455 ptlrpc_request_set_replen(req);
1457 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1458 aa = ptlrpc_req_async_args(req);
1460 aa->aa_requested_nob = requested_nob;
1461 aa->aa_nio_count = niocount;
1462 aa->aa_page_count = page_count;
1466 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1467 if (ocapa && reserve)
1468 aa->aa_ocapa = capa_get(ocapa);
1474 ptlrpc_req_finished(req);
1478 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1479 __u32 client_cksum, __u32 server_cksum, int nob,
1480 obd_count page_count, struct brw_page **pga,
1481 cksum_type_t client_cksum_type)
1485 cksum_type_t cksum_type;
1487 if (server_cksum == client_cksum) {
1488 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1492 /* If this is mmaped file - it can be changed at any time */
1493 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1496 if (oa->o_valid & OBD_MD_FLFLAGS)
1497 cksum_type = cksum_type_unpack(oa->o_flags);
1499 cksum_type = OBD_CKSUM_CRC32;
1501 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1504 if (cksum_type != client_cksum_type)
1505 msg = "the server did not use the checksum type specified in "
1506 "the original request - likely a protocol problem";
1507 else if (new_cksum == server_cksum)
1508 msg = "changed on the client after we checksummed it - "
1509 "likely false positive due to mmap IO (bug 11742)";
1510 else if (new_cksum == client_cksum)
1511 msg = "changed in transit before arrival at OST";
1513 msg = "changed in transit AND doesn't match the original - "
1514 "likely false positive due to mmap IO (bug 11742)";
1516 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1517 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1518 msg, libcfs_nid2str(peer->nid),
1519 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1520 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1521 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1523 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1525 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1526 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1527 "client csum now %x\n", client_cksum, client_cksum_type,
1528 server_cksum, cksum_type, new_cksum);
1532 /* Note rc enters this function as number of bytes transferred */
1533 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1535 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1536 const lnet_process_id_t *peer =
1537 &req->rq_import->imp_connection->c_peer;
1538 struct client_obd *cli = aa->aa_cli;
1539 struct ost_body *body;
1540 __u32 client_cksum = 0;
1543 if (rc < 0 && rc != -EDQUOT) {
1544 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1548 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1549 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1551 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1555 #ifdef HAVE_QUOTA_SUPPORT
1556 /* set/clear over quota flag for a uid/gid */
1557 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1558 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1559 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1561 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1562 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1564 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1569 osc_update_grant(cli, body);
1574 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1575 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1577 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1579 CERROR("Unexpected +ve rc %d\n", rc);
1582 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1584 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1587 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1588 check_write_checksum(&body->oa, peer, client_cksum,
1589 body->oa.o_cksum, aa->aa_requested_nob,
1590 aa->aa_page_count, aa->aa_ppga,
1591 cksum_type_unpack(aa->aa_oa->o_flags)))
1594 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1595 aa->aa_page_count, aa->aa_ppga);
1599 /* The rest of this function executes only for OST_READs */
1601 /* if unwrap_bulk failed, return -EAGAIN to retry */
1602 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1604 GOTO(out, rc = -EAGAIN);
1606 if (rc > aa->aa_requested_nob) {
1607 CERROR("Unexpected rc %d (%d requested)\n", rc,
1608 aa->aa_requested_nob);
1612 if (rc != req->rq_bulk->bd_nob_transferred) {
1613 CERROR ("Unexpected rc %d (%d transferred)\n",
1614 rc, req->rq_bulk->bd_nob_transferred);
1618 if (rc < aa->aa_requested_nob)
1619 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1621 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1622 static int cksum_counter;
1623 __u32 server_cksum = body->oa.o_cksum;
1626 cksum_type_t cksum_type;
1628 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1629 cksum_type = cksum_type_unpack(body->oa.o_flags);
1631 cksum_type = OBD_CKSUM_CRC32;
1632 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1633 aa->aa_ppga, OST_READ,
1636 if (peer->nid == req->rq_bulk->bd_sender) {
1640 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1643 if (server_cksum == ~0 && rc > 0) {
1644 CERROR("Protocol error: server %s set the 'checksum' "
1645 "bit, but didn't send a checksum. Not fatal, "
1646 "but please notify on http://bugs.whamcloud.com/\n",
1647 libcfs_nid2str(peer->nid));
1648 } else if (server_cksum != client_cksum) {
1649 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1650 "%s%s%s inode "DFID" object "
1651 LPU64"/"LPU64" extent "
1652 "["LPU64"-"LPU64"]\n",
1653 req->rq_import->imp_obd->obd_name,
1654 libcfs_nid2str(peer->nid),
1656 body->oa.o_valid & OBD_MD_FLFID ?
1657 body->oa.o_parent_seq : (__u64)0,
1658 body->oa.o_valid & OBD_MD_FLFID ?
1659 body->oa.o_parent_oid : 0,
1660 body->oa.o_valid & OBD_MD_FLFID ?
1661 body->oa.o_parent_ver : 0,
1663 body->oa.o_valid & OBD_MD_FLGROUP ?
1664 body->oa.o_seq : (__u64)0,
1665 aa->aa_ppga[0]->off,
1666 aa->aa_ppga[aa->aa_page_count-1]->off +
1667 aa->aa_ppga[aa->aa_page_count-1]->count -
1669 CERROR("client %x, server %x, cksum_type %x\n",
1670 client_cksum, server_cksum, cksum_type);
1672 aa->aa_oa->o_cksum = client_cksum;
1676 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1679 } else if (unlikely(client_cksum)) {
1680 static int cksum_missed;
1683 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1684 CERROR("Checksum %u requested from %s but not sent\n",
1685 cksum_missed, libcfs_nid2str(peer->nid));
1691 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1696 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1697 struct lov_stripe_md *lsm,
1698 obd_count page_count, struct brw_page **pga,
1699 struct obd_capa *ocapa)
1701 struct ptlrpc_request *req;
1704 int generation, resends = 0;
1705 struct l_wait_info lwi;
1709 cfs_waitq_init(&waitq);
1710 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1713 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1714 page_count, pga, &req, ocapa, 0, resends);
1719 req->rq_generation_set = 1;
1720 req->rq_import_generation = generation;
1721 req->rq_sent = cfs_time_current_sec() + resends;
1724 rc = ptlrpc_queue_wait(req);
1726 if (rc == -ETIMEDOUT && req->rq_resend) {
1727 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1728 ptlrpc_req_finished(req);
1732 rc = osc_brw_fini_request(req, rc);
1734 ptlrpc_req_finished(req);
1735 /* When server return -EINPROGRESS, client should always retry
1736 * regardless of the number of times the bulk was resent already.*/
1737 if (osc_recoverable_error(rc)) {
1739 if (rc != -EINPROGRESS &&
1740 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1741 CERROR("%s: too many resend retries for object: "
1742 ""LPU64":"LPU64", rc = %d.\n",
1743 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1747 exp->exp_obd->u.cli.cl_import->imp_generation) {
1748 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1749 ""LPU64":"LPU64", rc = %d.\n",
1750 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1754 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1756 l_wait_event(waitq, 0, &lwi);
1761 if (rc == -EAGAIN || rc == -EINPROGRESS)
1766 static int osc_brw_redo_request(struct ptlrpc_request *request,
1767 struct osc_brw_async_args *aa, int rc)
1769 struct ptlrpc_request *new_req;
1770 struct ptlrpc_request_set *set = request->rq_set;
1771 struct osc_brw_async_args *new_aa;
1772 struct osc_async_page *oap;
1775 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1776 "redo for recoverable error %d", rc);
1778 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1779 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1780 aa->aa_cli, aa->aa_oa,
1781 NULL /* lsm unused by osc currently */,
1782 aa->aa_page_count, aa->aa_ppga,
1783 &new_req, aa->aa_ocapa, 0, 1);
1787 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1789 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1790 if (oap->oap_request != NULL) {
1791 LASSERTF(request == oap->oap_request,
1792 "request %p != oap_request %p\n",
1793 request, oap->oap_request);
1794 if (oap->oap_interrupted) {
1795 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1796 ptlrpc_req_finished(new_req);
1801 /* New request takes over pga and oaps from old request.
1802 * Note that copying a list_head doesn't work, need to move it... */
1804 new_req->rq_interpret_reply = request->rq_interpret_reply;
1805 new_req->rq_async_args = request->rq_async_args;
1806 /* cap resend delay to the current request timeout, this is similar to
1807 * what ptlrpc does (see after_reply()) */
1808 if (aa->aa_resends > new_req->rq_timeout)
1809 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1811 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1812 new_req->rq_generation_set = 1;
1813 new_req->rq_import_generation = request->rq_import_generation;
1815 new_aa = ptlrpc_req_async_args(new_req);
1817 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1818 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1819 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1821 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1822 if (oap->oap_request) {
1823 ptlrpc_req_finished(oap->oap_request);
1824 oap->oap_request = ptlrpc_request_addref(new_req);
1828 new_aa->aa_ocapa = aa->aa_ocapa;
1829 aa->aa_ocapa = NULL;
1831 /* use ptlrpc_set_add_req is safe because interpret functions work
1832 * in check_set context. only one way exist with access to request
1833 * from different thread got -EINTR - this way protected with
1834 * cl_loi_list_lock */
1835 ptlrpc_set_add_req(set, new_req);
1837 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1839 DEBUG_REQ(D_INFO, new_req, "new request");
1844 * ugh, we want disk allocation on the target to happen in offset order. we'll
1845 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1846 * fine for our small page arrays and doesn't require allocation. its an
1847 * insertion sort that swaps elements that are strides apart, shrinking the
1848 * stride down until its '1' and the array is sorted.
1850 static void sort_brw_pages(struct brw_page **array, int num)
1853 struct brw_page *tmp;
1857 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1862 for (i = stride ; i < num ; i++) {
1865 while (j >= stride && array[j - stride]->off > tmp->off) {
1866 array[j] = array[j - stride];
1871 } while (stride > 1);
1874 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1880 LASSERT (pages > 0);
1881 offset = pg[i]->off & ~CFS_PAGE_MASK;
1885 if (pages == 0) /* that's all */
1888 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1889 return count; /* doesn't end on page boundary */
1892 offset = pg[i]->off & ~CFS_PAGE_MASK;
1893 if (offset != 0) /* doesn't start on page boundary */
1900 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1902 struct brw_page **ppga;
1905 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1909 for (i = 0; i < count; i++)
1914 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1916 LASSERT(ppga != NULL);
1917 OBD_FREE(ppga, sizeof(*ppga) * count);
1920 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1921 obd_count page_count, struct brw_page *pga,
1922 struct obd_trans_info *oti)
1924 struct obdo *saved_oa = NULL;
1925 struct brw_page **ppga, **orig;
1926 struct obd_import *imp = class_exp2cliimp(exp);
1927 struct client_obd *cli;
1928 int rc, page_count_orig;
1931 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1932 cli = &imp->imp_obd->u.cli;
1934 if (cmd & OBD_BRW_CHECK) {
1935 /* The caller just wants to know if there's a chance that this
1936 * I/O can succeed */
1938 if (imp->imp_invalid)
1943 /* test_brw with a failed create can trip this, maybe others. */
1944 LASSERT(cli->cl_max_pages_per_rpc);
1948 orig = ppga = osc_build_ppga(pga, page_count);
1951 page_count_orig = page_count;
1953 sort_brw_pages(ppga, page_count);
1954 while (page_count) {
1955 obd_count pages_per_brw;
1957 if (page_count > cli->cl_max_pages_per_rpc)
1958 pages_per_brw = cli->cl_max_pages_per_rpc;
1960 pages_per_brw = page_count;
1962 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1964 if (saved_oa != NULL) {
1965 /* restore previously saved oa */
1966 *oinfo->oi_oa = *saved_oa;
1967 } else if (page_count > pages_per_brw) {
1968 /* save a copy of oa (brw will clobber it) */
1969 OBDO_ALLOC(saved_oa);
1970 if (saved_oa == NULL)
1971 GOTO(out, rc = -ENOMEM);
1972 *saved_oa = *oinfo->oi_oa;
1975 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1976 pages_per_brw, ppga, oinfo->oi_capa);
1981 page_count -= pages_per_brw;
1982 ppga += pages_per_brw;
1986 osc_release_ppga(orig, page_count_orig);
1988 if (saved_oa != NULL)
1989 OBDO_FREE(saved_oa);
1994 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1995 * the dirty accounting. Writeback completes or truncate happens before
1996 * writing starts. Must be called with the loi lock held. */
1997 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
2000 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
2004 /* This maintains the lists of pending pages to read/write for a given object
2005 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
2006 * to quickly find objects that are ready to send an RPC. */
2007 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
2013 if (lop->lop_num_pending == 0)
2016 /* if we have an invalid import we want to drain the queued pages
2017 * by forcing them through rpcs that immediately fail and complete
2018 * the pages. recovery relies on this to empty the queued pages
2019 * before canceling the locks and evicting down the llite pages */
2020 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2023 /* stream rpcs in queue order as long as as there is an urgent page
2024 * queued. this is our cheap solution for good batching in the case
2025 * where writepage marks some random page in the middle of the file
2026 * as urgent because of, say, memory pressure */
2027 if (!cfs_list_empty(&lop->lop_urgent)) {
2028 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2031 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2032 optimal = cli->cl_max_pages_per_rpc;
2033 if (cmd & OBD_BRW_WRITE) {
2034 /* trigger a write rpc stream as long as there are dirtiers
2035 * waiting for space. as they're waiting, they're not going to
2036 * create more pages to coalesce with what's waiting.. */
2037 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2038 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2041 /* +16 to avoid triggering rpcs that would want to include pages
2042 * that are being queued but which can't be made ready until
2043 * the queuer finishes with the page. this is a wart for
2044 * llite::commit_write() */
2047 if (lop->lop_num_pending >= optimal)
2053 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2055 struct osc_async_page *oap;
2058 if (cfs_list_empty(&lop->lop_urgent))
2061 oap = cfs_list_entry(lop->lop_urgent.next,
2062 struct osc_async_page, oap_urgent_item);
2064 if (oap->oap_async_flags & ASYNC_HP) {
2065 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2072 static void on_list(cfs_list_t *item, cfs_list_t *list,
2075 if (cfs_list_empty(item) && should_be_on)
2076 cfs_list_add_tail(item, list);
2077 else if (!cfs_list_empty(item) && !should_be_on)
2078 cfs_list_del_init(item);
2081 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2082 * can find pages to build into rpcs quickly */
2083 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2085 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2086 lop_makes_hprpc(&loi->loi_read_lop)) {
2088 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2089 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2091 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2092 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2093 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2094 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2097 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2098 loi->loi_write_lop.lop_num_pending);
2100 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2101 loi->loi_read_lop.lop_num_pending);
2104 static void lop_update_pending(struct client_obd *cli,
2105 struct loi_oap_pages *lop, int cmd, int delta)
2107 lop->lop_num_pending += delta;
2108 if (cmd & OBD_BRW_WRITE)
2109 cli->cl_pending_w_pages += delta;
2111 cli->cl_pending_r_pages += delta;
2115 * this is called when a sync waiter receives an interruption. Its job is to
2116 * get the caller woken as soon as possible. If its page hasn't been put in an
2117 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2118 * desiring interruption which will forcefully complete the rpc once the rpc
2121 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2123 struct loi_oap_pages *lop;
2124 struct lov_oinfo *loi;
2128 LASSERT(!oap->oap_interrupted);
2129 oap->oap_interrupted = 1;
2131 /* ok, it's been put in an rpc. only one oap gets a request reference */
2132 if (oap->oap_request != NULL) {
2133 ptlrpc_mark_interrupted(oap->oap_request);
2134 ptlrpcd_wake(oap->oap_request);
2135 ptlrpc_req_finished(oap->oap_request);
2136 oap->oap_request = NULL;
2140 * page completion may be called only if ->cpo_prep() method was
2141 * executed by osc_io_submit(), that also adds page the to pending list
2143 if (!cfs_list_empty(&oap->oap_pending_item)) {
2144 cfs_list_del_init(&oap->oap_pending_item);
2145 cfs_list_del_init(&oap->oap_urgent_item);
2148 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2149 &loi->loi_write_lop : &loi->loi_read_lop;
2150 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2151 loi_list_maint(oap->oap_cli, oap->oap_loi);
2152 rc = oap->oap_caller_ops->ap_completion(env,
2153 oap->oap_caller_data,
2154 oap->oap_cmd, NULL, -EINTR);
2160 /* this is trying to propogate async writeback errors back up to the
2161 * application. As an async write fails we record the error code for later if
2162 * the app does an fsync. As long as errors persist we force future rpcs to be
2163 * sync so that the app can get a sync error and break the cycle of queueing
2164 * pages for which writeback will fail. */
2165 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2172 ar->ar_force_sync = 1;
2173 ar->ar_min_xid = ptlrpc_sample_next_xid();
2178 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2179 ar->ar_force_sync = 0;
2182 void osc_oap_to_pending(struct osc_async_page *oap)
2184 struct loi_oap_pages *lop;
2186 if (oap->oap_cmd & OBD_BRW_WRITE)
2187 lop = &oap->oap_loi->loi_write_lop;
2189 lop = &oap->oap_loi->loi_read_lop;
2191 if (oap->oap_async_flags & ASYNC_HP)
2192 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2193 else if (oap->oap_async_flags & ASYNC_URGENT)
2194 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2195 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2196 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2199 /* this must be called holding the loi list lock to give coverage to exit_cache,
2200 * async_flag maintenance, and oap_request */
2201 static void osc_ap_completion(const struct lu_env *env,
2202 struct client_obd *cli, struct obdo *oa,
2203 struct osc_async_page *oap, int sent, int rc)
2208 if (oap->oap_request != NULL) {
2209 xid = ptlrpc_req_xid(oap->oap_request);
2210 ptlrpc_req_finished(oap->oap_request);
2211 oap->oap_request = NULL;
2214 cfs_spin_lock(&oap->oap_lock);
2215 oap->oap_async_flags = 0;
2216 cfs_spin_unlock(&oap->oap_lock);
2217 oap->oap_interrupted = 0;
2219 if (oap->oap_cmd & OBD_BRW_WRITE) {
2220 osc_process_ar(&cli->cl_ar, xid, rc);
2221 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2224 if (rc == 0 && oa != NULL) {
2225 if (oa->o_valid & OBD_MD_FLBLOCKS)
2226 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2227 if (oa->o_valid & OBD_MD_FLMTIME)
2228 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2229 if (oa->o_valid & OBD_MD_FLATIME)
2230 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2231 if (oa->o_valid & OBD_MD_FLCTIME)
2232 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2235 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2236 oap->oap_cmd, oa, rc);
2238 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2239 * I/O on the page could start, but OSC calls it under lock
2240 * and thus we can add oap back to pending safely */
2242 /* upper layer wants to leave the page on pending queue */
2243 osc_oap_to_pending(oap);
2245 osc_exit_cache(cli, oap, sent);
2249 static int brw_interpret(const struct lu_env *env,
2250 struct ptlrpc_request *req, void *data, int rc)
2252 struct osc_brw_async_args *aa = data;
2253 struct client_obd *cli;
2257 rc = osc_brw_fini_request(req, rc);
2258 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2259 /* When server return -EINPROGRESS, client should always retry
2260 * regardless of the number of times the bulk was resent already. */
2261 if (osc_recoverable_error(rc)) {
2262 /* Only retry once for mmaped files since the mmaped page
2263 * might be modified at anytime. We have to retry at least
2264 * once in case there WAS really a corruption of the page
2265 * on the network, that was not caused by mmap() modifying
2266 * the page. Bug11742 */
2267 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2268 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2269 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2271 } else if (req->rq_import_generation !=
2272 req->rq_import->imp_generation) {
2273 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2274 ""LPU64":"LPU64", rc = %d.\n",
2275 req->rq_import->imp_obd->obd_name,
2276 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2278 } else if (rc == -EINPROGRESS ||
2279 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2280 rc = osc_brw_redo_request(req, aa, rc);
2284 CERROR("%s: too many resent retries for object: "
2285 ""LPU64":"LPU64", rc = %d.\n",
2286 req->rq_import->imp_obd->obd_name,
2287 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2293 capa_put(aa->aa_ocapa);
2294 aa->aa_ocapa = NULL;
2299 client_obd_list_lock(&cli->cl_loi_list_lock);
2301 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2302 * is called so we know whether to go to sync BRWs or wait for more
2303 * RPCs to complete */
2304 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2305 cli->cl_w_in_flight--;
2307 cli->cl_r_in_flight--;
2309 async = cfs_list_empty(&aa->aa_oaps);
2310 if (!async) { /* from osc_send_oap_rpc() */
2311 struct osc_async_page *oap, *tmp;
2312 /* the caller may re-use the oap after the completion call so
2313 * we need to clean it up a little */
2314 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2316 cfs_list_del_init(&oap->oap_rpc_item);
2317 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2319 OBDO_FREE(aa->aa_oa);
2320 } else { /* from async_internal() */
2322 for (i = 0; i < aa->aa_page_count; i++)
2323 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2325 osc_wake_cache_waiters(cli);
2326 osc_check_rpcs(env, cli);
2327 client_obd_list_unlock(&cli->cl_loi_list_lock);
2329 cl_req_completion(env, aa->aa_clerq, rc);
2330 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2335 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2336 struct client_obd *cli,
2337 cfs_list_t *rpc_list,
2338 int page_count, int cmd)
2340 struct ptlrpc_request *req;
2341 struct brw_page **pga = NULL;
2342 struct osc_brw_async_args *aa;
2343 struct obdo *oa = NULL;
2344 const struct obd_async_page_ops *ops = NULL;
2345 void *caller_data = NULL;
2346 struct osc_async_page *oap;
2347 struct osc_async_page *tmp;
2348 struct ost_body *body;
2349 struct cl_req *clerq = NULL;
2350 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2351 struct ldlm_lock *lock = NULL;
2352 struct cl_req_attr crattr;
2353 int i, rc, mpflag = 0;
2356 LASSERT(!cfs_list_empty(rpc_list));
2358 if (cmd & OBD_BRW_MEMALLOC)
2359 mpflag = cfs_memory_pressure_get_and_set();
2361 memset(&crattr, 0, sizeof crattr);
2362 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2364 GOTO(out, req = ERR_PTR(-ENOMEM));
2368 GOTO(out, req = ERR_PTR(-ENOMEM));
2371 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2372 struct cl_page *page = osc_oap2cl_page(oap);
2374 ops = oap->oap_caller_ops;
2375 caller_data = oap->oap_caller_data;
2377 clerq = cl_req_alloc(env, page, crt,
2378 1 /* only 1-object rpcs for
2381 GOTO(out, req = (void *)clerq);
2382 lock = oap->oap_ldlm_lock;
2384 pga[i] = &oap->oap_brw_page;
2385 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2386 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2387 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2389 cl_req_page_add(env, clerq, page);
2392 /* always get the data for the obdo for the rpc */
2393 LASSERT(ops != NULL);
2395 crattr.cra_capa = NULL;
2396 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2398 oa->o_handle = lock->l_remote_handle;
2399 oa->o_valid |= OBD_MD_FLHANDLE;
2402 rc = cl_req_prep(env, clerq);
2404 CERROR("cl_req_prep failed: %d\n", rc);
2405 GOTO(out, req = ERR_PTR(rc));
2408 sort_brw_pages(pga, page_count);
2409 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2410 pga, &req, crattr.cra_capa, 1, 0);
2412 CERROR("prep_req failed: %d\n", rc);
2413 GOTO(out, req = ERR_PTR(rc));
2416 if (cmd & OBD_BRW_MEMALLOC)
2417 req->rq_memalloc = 1;
2419 /* Need to update the timestamps after the request is built in case
2420 * we race with setattr (locally or in queue at OST). If OST gets
2421 * later setattr before earlier BRW (as determined by the request xid),
2422 * the OST will not use BRW timestamps. Sadly, there is no obvious
2423 * way to do this in a single call. bug 10150 */
2424 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2425 cl_req_attr_set(env, clerq, &crattr,
2426 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2428 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2429 aa = ptlrpc_req_async_args(req);
2430 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2431 cfs_list_splice(rpc_list, &aa->aa_oaps);
2432 CFS_INIT_LIST_HEAD(rpc_list);
2433 aa->aa_clerq = clerq;
2435 if (cmd & OBD_BRW_MEMALLOC)
2436 cfs_memory_pressure_restore(mpflag);
2438 capa_put(crattr.cra_capa);
2443 OBD_FREE(pga, sizeof(*pga) * page_count);
2444 /* this should happen rarely and is pretty bad, it makes the
2445 * pending list not follow the dirty order */
2446 client_obd_list_lock(&cli->cl_loi_list_lock);
2447 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2448 cfs_list_del_init(&oap->oap_rpc_item);
2450 /* queued sync pages can be torn down while the pages
2451 * were between the pending list and the rpc */
2452 if (oap->oap_interrupted) {
2453 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2454 osc_ap_completion(env, cli, NULL, oap, 0,
2458 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2460 if (clerq && !IS_ERR(clerq))
2461 cl_req_completion(env, clerq, PTR_ERR(req));
2467 * prepare pages for ASYNC io and put pages in send queue.
2469 * \param cmd OBD_BRW_* macroses
2470 * \param lop pending pages
2472 * \return zero if no page added to send queue.
2473 * \return 1 if pages successfully added to send queue.
2474 * \return negative on errors.
2477 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2478 struct lov_oinfo *loi,
2479 int cmd, struct loi_oap_pages *lop)
2481 struct ptlrpc_request *req;
2482 obd_count page_count = 0;
2483 struct osc_async_page *oap = NULL, *tmp;
2484 struct osc_brw_async_args *aa;
2485 const struct obd_async_page_ops *ops;
2486 CFS_LIST_HEAD(rpc_list);
2487 int srvlock = 0, mem_tight = 0;
2488 struct cl_object *clob = NULL;
2489 obd_off starting_offset = OBD_OBJECT_EOF;
2490 unsigned int ending_offset;
2491 int starting_page_off = 0;
2494 /* ASYNC_HP pages first. At present, when the lock the pages is
2495 * to be canceled, the pages covered by the lock will be sent out
2496 * with ASYNC_HP. We have to send out them as soon as possible. */
2497 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2498 if (oap->oap_async_flags & ASYNC_HP)
2499 cfs_list_move(&oap->oap_pending_item, &rpc_list);
2500 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2501 /* only do this for writeback pages. */
2502 cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2503 if (++page_count >= cli->cl_max_pages_per_rpc)
2506 cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2509 /* first we find the pages we're allowed to work with */
2510 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2512 ops = oap->oap_caller_ops;
2514 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2515 "magic 0x%x\n", oap, oap->oap_magic);
2518 /* pin object in memory, so that completion call-backs
2519 * can be safely called under client_obd_list lock. */
2520 clob = osc_oap2cl_page(oap)->cp_obj;
2521 cl_object_get(clob);
2524 if (page_count != 0 &&
2525 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2526 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2527 " oap %p, page %p, srvlock %u\n",
2528 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2532 /* If there is a gap at the start of this page, it can't merge
2533 * with any previous page, so we'll hand the network a
2534 * "fragmented" page array that it can't transfer in 1 RDMA */
2535 if (oap->oap_obj_off < starting_offset) {
2536 if (starting_page_off != 0)
2539 starting_page_off = oap->oap_page_off;
2540 starting_offset = oap->oap_obj_off + starting_page_off;
2541 } else if (oap->oap_page_off != 0)
2544 /* in llite being 'ready' equates to the page being locked
2545 * until completion unlocks it. commit_write submits a page
2546 * as not ready because its unlock will happen unconditionally
2547 * as the call returns. if we race with commit_write giving
2548 * us that page we don't want to create a hole in the page
2549 * stream, so we stop and leave the rpc to be fired by
2550 * another dirtier or kupdated interval (the not ready page
2551 * will still be on the dirty list). we could call in
2552 * at the end of ll_file_write to process the queue again. */
2553 if (!(oap->oap_async_flags & ASYNC_READY)) {
2554 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2557 CDEBUG(D_INODE, "oap %p page %p returned %d "
2558 "instead of ready\n", oap,
2562 /* llite is telling us that the page is still
2563 * in commit_write and that we should try
2564 * and put it in an rpc again later. we
2565 * break out of the loop so we don't create
2566 * a hole in the sequence of pages in the rpc
2571 /* the io isn't needed.. tell the checks
2572 * below to complete the rpc with EINTR */
2573 cfs_spin_lock(&oap->oap_lock);
2574 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2575 cfs_spin_unlock(&oap->oap_lock);
2576 oap->oap_count = -EINTR;
2579 cfs_spin_lock(&oap->oap_lock);
2580 oap->oap_async_flags |= ASYNC_READY;
2581 cfs_spin_unlock(&oap->oap_lock);
2584 LASSERTF(0, "oap %p page %p returned %d "
2585 "from make_ready\n", oap,
2593 * Page submitted for IO has to be locked. Either by
2594 * ->ap_make_ready() or by higher layers.
2596 #if defined(__KERNEL__) && defined(__linux__)
2598 struct cl_page *page;
2600 page = osc_oap2cl_page(oap);
2602 if (page->cp_type == CPT_CACHEABLE &&
2603 !(PageLocked(oap->oap_page) &&
2604 (CheckWriteback(oap->oap_page, cmd)))) {
2605 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2607 (long)oap->oap_page->flags,
2608 oap->oap_async_flags);
2614 /* take the page out of our book-keeping */
2615 cfs_list_del_init(&oap->oap_pending_item);
2616 lop_update_pending(cli, lop, cmd, -1);
2617 cfs_list_del_init(&oap->oap_urgent_item);
2619 /* ask the caller for the size of the io as the rpc leaves. */
2620 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2622 ops->ap_refresh_count(env, oap->oap_caller_data,
2624 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2626 if (oap->oap_count <= 0) {
2627 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2629 osc_ap_completion(env, cli, NULL,
2630 oap, 0, oap->oap_count);
2634 /* now put the page back in our accounting */
2635 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2636 if (page_count++ == 0)
2637 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2639 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2642 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2643 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2644 * have the same alignment as the initial writes that allocated
2645 * extents on the server. */
2646 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2648 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2651 if (page_count >= cli->cl_max_pages_per_rpc)
2654 /* If there is a gap at the end of this page, it can't merge
2655 * with any subsequent pages, so we'll hand the network a
2656 * "fragmented" page array that it can't transfer in 1 RDMA */
2657 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2661 osc_wake_cache_waiters(cli);
2663 loi_list_maint(cli, loi);
2665 client_obd_list_unlock(&cli->cl_loi_list_lock);
2668 cl_object_put(env, clob);
2670 if (page_count == 0) {
2671 client_obd_list_lock(&cli->cl_loi_list_lock);
2675 req = osc_build_req(env, cli, &rpc_list, page_count,
2676 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2678 LASSERT(cfs_list_empty(&rpc_list));
2679 loi_list_maint(cli, loi);
2680 RETURN(PTR_ERR(req));
2683 aa = ptlrpc_req_async_args(req);
2685 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2686 if (cmd == OBD_BRW_READ) {
2687 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2688 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2689 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2690 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2692 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2693 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2694 cli->cl_w_in_flight);
2695 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2696 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2698 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2700 client_obd_list_lock(&cli->cl_loi_list_lock);
2702 if (cmd == OBD_BRW_READ)
2703 cli->cl_r_in_flight++;
2705 cli->cl_w_in_flight++;
2707 /* queued sync pages can be torn down while the pages
2708 * were between the pending list and the rpc */
2710 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2711 /* only one oap gets a request reference */
2714 if (oap->oap_interrupted && !req->rq_intr) {
2715 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2717 ptlrpc_mark_interrupted(req);
2721 tmp->oap_request = ptlrpc_request_addref(req);
2723 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2724 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2726 req->rq_interpret_reply = brw_interpret;
2727 ptlrpcd_add_req(req, PSCOPE_BRW);
2731 #define LOI_DEBUG(LOI, STR, args...) \
2732 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2733 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2734 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2735 (LOI)->loi_write_lop.lop_num_pending, \
2736 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2737 (LOI)->loi_read_lop.lop_num_pending, \
2738 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2741 /* This is called by osc_check_rpcs() to find which objects have pages that
2742 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2743 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2747 /* First return objects that have blocked locks so that they
2748 * will be flushed quickly and other clients can get the lock,
2749 * then objects which have pages ready to be stuffed into RPCs */
2750 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2751 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2752 struct lov_oinfo, loi_hp_ready_item));
2753 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2754 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2755 struct lov_oinfo, loi_ready_item));
2757 /* then if we have cache waiters, return all objects with queued
2758 * writes. This is especially important when many small files
2759 * have filled up the cache and not been fired into rpcs because
2760 * they don't pass the nr_pending/object threshhold */
2761 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2762 !cfs_list_empty(&cli->cl_loi_write_list))
2763 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2764 struct lov_oinfo, loi_write_item));
2766 /* then return all queued objects when we have an invalid import
2767 * so that they get flushed */
2768 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2769 if (!cfs_list_empty(&cli->cl_loi_write_list))
2770 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2773 if (!cfs_list_empty(&cli->cl_loi_read_list))
2774 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2775 struct lov_oinfo, loi_read_item));
2780 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2782 struct osc_async_page *oap;
2785 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2786 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2787 struct osc_async_page, oap_urgent_item);
2788 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2791 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2792 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2793 struct osc_async_page, oap_urgent_item);
2794 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2797 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2800 /* called with the loi list lock held */
2801 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2803 struct lov_oinfo *loi;
2804 int rc = 0, race_counter = 0;
2807 while ((loi = osc_next_loi(cli)) != NULL) {
2808 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2810 if (osc_max_rpc_in_flight(cli, loi))
2813 /* attempt some read/write balancing by alternating between
2814 * reads and writes in an object. The makes_rpc checks here
2815 * would be redundant if we were getting read/write work items
2816 * instead of objects. we don't want send_oap_rpc to drain a
2817 * partial read pending queue when we're given this object to
2818 * do io on writes while there are cache waiters */
2819 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2820 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2821 &loi->loi_write_lop);
2823 CERROR("Write request failed with %d\n", rc);
2825 /* osc_send_oap_rpc failed, mostly because of
2828 * It can't break here, because if:
2829 * - a page was submitted by osc_io_submit, so
2831 * - no request in flight
2832 * - no subsequent request
2833 * The system will be in live-lock state,
2834 * because there is no chance to call
2835 * osc_io_unplug() and osc_check_rpcs() any
2836 * more. pdflush can't help in this case,
2837 * because it might be blocked at grabbing
2838 * the page lock as we mentioned.
2840 * Anyway, continue to drain pages. */
2849 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2850 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2851 &loi->loi_read_lop);
2853 CERROR("Read request failed with %d\n", rc);
2861 /* attempt some inter-object balancing by issuing rpcs
2862 * for each object in turn */
2863 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2864 cfs_list_del_init(&loi->loi_hp_ready_item);
2865 if (!cfs_list_empty(&loi->loi_ready_item))
2866 cfs_list_del_init(&loi->loi_ready_item);
2867 if (!cfs_list_empty(&loi->loi_write_item))
2868 cfs_list_del_init(&loi->loi_write_item);
2869 if (!cfs_list_empty(&loi->loi_read_item))
2870 cfs_list_del_init(&loi->loi_read_item);
2872 loi_list_maint(cli, loi);
2874 /* send_oap_rpc fails with 0 when make_ready tells it to
2875 * back off. llite's make_ready does this when it tries
2876 * to lock a page queued for write that is already locked.
2877 * we want to try sending rpcs from many objects, but we
2878 * don't want to spin failing with 0. */
2879 if (race_counter == 10)
2885 /* we're trying to queue a page in the osc so we're subject to the
2886 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2887 * If the osc's queued pages are already at that limit, then we want to sleep
2888 * until there is space in the osc's queue for us. We also may be waiting for
2889 * write credits from the OST if there are RPCs in flight that may return some
2890 * before we fall back to sync writes.
2892 * We need this know our allocation was granted in the presence of signals */
2893 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2897 client_obd_list_lock(&cli->cl_loi_list_lock);
2898 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2899 client_obd_list_unlock(&cli->cl_loi_list_lock);
2904 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2907 int osc_enter_cache_try(const struct lu_env *env,
2908 struct client_obd *cli, struct lov_oinfo *loi,
2909 struct osc_async_page *oap, int transient)
2913 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2915 osc_consume_write_grant(cli, &oap->oap_brw_page);
2917 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2918 cfs_atomic_inc(&obd_dirty_transit_pages);
2919 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2925 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2926 * grant or cache space. */
2927 static int osc_enter_cache(const struct lu_env *env,
2928 struct client_obd *cli, struct lov_oinfo *loi,
2929 struct osc_async_page *oap)
2931 struct osc_cache_waiter ocw;
2932 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2936 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2937 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2938 cli->cl_dirty_max, obd_max_dirty_pages,
2939 cli->cl_lost_grant, cli->cl_avail_grant);
2941 /* force the caller to try sync io. this can jump the list
2942 * of queued writes and create a discontiguous rpc stream */
2943 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2944 cli->cl_dirty_max < CFS_PAGE_SIZE ||
2945 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2948 /* Hopefully normal case - cache space and write credits available */
2949 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2950 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2951 osc_enter_cache_try(env, cli, loi, oap, 0))
2954 /* It is safe to block as a cache waiter as long as there is grant
2955 * space available or the hope of additional grant being returned
2956 * when an in flight write completes. Using the write back cache
2957 * if possible is preferable to sending the data synchronously
2958 * because write pages can then be merged in to large requests.
2959 * The addition of this cache waiter will causing pending write
2960 * pages to be sent immediately. */
2961 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2962 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2963 cfs_waitq_init(&ocw.ocw_waitq);
2967 loi_list_maint(cli, loi);
2968 osc_check_rpcs(env, cli);
2969 client_obd_list_unlock(&cli->cl_loi_list_lock);
2971 CDEBUG(D_CACHE, "sleeping for cache space\n");
2972 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2974 client_obd_list_lock(&cli->cl_loi_list_lock);
2975 if (!cfs_list_empty(&ocw.ocw_entry)) {
2976 cfs_list_del(&ocw.ocw_entry);
2986 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2987 struct lov_oinfo *loi, cfs_page_t *page,
2988 obd_off offset, const struct obd_async_page_ops *ops,
2989 void *data, void **res, int nocache,
2990 struct lustre_handle *lockh)
2992 struct osc_async_page *oap;
2997 return cfs_size_round(sizeof(*oap));
3000 oap->oap_magic = OAP_MAGIC;
3001 oap->oap_cli = &exp->exp_obd->u.cli;
3004 oap->oap_caller_ops = ops;
3005 oap->oap_caller_data = data;
3007 oap->oap_page = page;
3008 oap->oap_obj_off = offset;
3009 if (!client_is_remote(exp) &&
3010 cfs_capable(CFS_CAP_SYS_RESOURCE))
3011 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
3013 LASSERT(!(offset & ~CFS_PAGE_MASK));
3015 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
3016 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
3017 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
3018 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
3020 cfs_spin_lock_init(&oap->oap_lock);
3021 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
3025 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
3026 struct lov_stripe_md *lsm, struct lov_oinfo *loi,
3027 struct osc_async_page *oap, int cmd, int off,
3028 int count, obd_flag brw_flags, enum async_flags async_flags)
3030 struct client_obd *cli = &exp->exp_obd->u.cli;
3034 if (oap->oap_magic != OAP_MAGIC)
3037 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3040 if (!cfs_list_empty(&oap->oap_pending_item) ||
3041 !cfs_list_empty(&oap->oap_urgent_item) ||
3042 !cfs_list_empty(&oap->oap_rpc_item))
3045 /* check if the file's owner/group is over quota */
3046 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3047 struct cl_object *obj;
3048 struct cl_attr attr; /* XXX put attr into thread info */
3049 unsigned int qid[MAXQUOTAS];
3051 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3053 cl_object_attr_lock(obj);
3054 rc = cl_object_attr_get(env, obj, &attr);
3055 cl_object_attr_unlock(obj);
3057 qid[USRQUOTA] = attr.cat_uid;
3058 qid[GRPQUOTA] = attr.cat_gid;
3060 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3067 loi = lsm->lsm_oinfo[0];
3069 client_obd_list_lock(&cli->cl_loi_list_lock);
3071 LASSERT(off + count <= CFS_PAGE_SIZE);
3073 oap->oap_page_off = off;
3074 oap->oap_count = count;
3075 oap->oap_brw_flags = brw_flags;
3076 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3077 if (cfs_memory_pressure_get())
3078 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3079 cfs_spin_lock(&oap->oap_lock);
3080 oap->oap_async_flags = async_flags;
3081 cfs_spin_unlock(&oap->oap_lock);
3083 if (cmd & OBD_BRW_WRITE) {
3084 rc = osc_enter_cache(env, cli, loi, oap);
3086 client_obd_list_unlock(&cli->cl_loi_list_lock);
3091 osc_oap_to_pending(oap);
3092 loi_list_maint(cli, loi);
3094 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3097 osc_check_rpcs(env, cli);
3098 client_obd_list_unlock(&cli->cl_loi_list_lock);
3103 /* aka (~was & now & flag), but this is more clear :) */
3104 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3106 int osc_set_async_flags_base(struct client_obd *cli,
3107 struct lov_oinfo *loi, struct osc_async_page *oap,
3108 obd_flag async_flags)
3110 struct loi_oap_pages *lop;
3114 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3116 if (oap->oap_cmd & OBD_BRW_WRITE) {
3117 lop = &loi->loi_write_lop;
3119 lop = &loi->loi_read_lop;
3122 if ((oap->oap_async_flags & async_flags) == async_flags)
3125 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3126 flags |= ASYNC_READY;
3128 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3129 cfs_list_empty(&oap->oap_rpc_item)) {
3130 if (oap->oap_async_flags & ASYNC_HP)
3131 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3133 cfs_list_add_tail(&oap->oap_urgent_item,
3135 flags |= ASYNC_URGENT;
3136 loi_list_maint(cli, loi);
3138 cfs_spin_lock(&oap->oap_lock);
3139 oap->oap_async_flags |= flags;
3140 cfs_spin_unlock(&oap->oap_lock);
3142 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3143 oap->oap_async_flags);
3147 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3148 struct lov_oinfo *loi, struct osc_async_page *oap)
3150 struct client_obd *cli = &exp->exp_obd->u.cli;
3151 struct loi_oap_pages *lop;
3155 if (oap->oap_magic != OAP_MAGIC)
3159 loi = lsm->lsm_oinfo[0];
3161 if (oap->oap_cmd & OBD_BRW_WRITE) {
3162 lop = &loi->loi_write_lop;
3164 lop = &loi->loi_read_lop;
3167 client_obd_list_lock(&cli->cl_loi_list_lock);
3169 if (!cfs_list_empty(&oap->oap_rpc_item))
3170 GOTO(out, rc = -EBUSY);
3172 osc_exit_cache(cli, oap, 0);
3173 osc_wake_cache_waiters(cli);
3175 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3176 cfs_list_del_init(&oap->oap_urgent_item);
3177 cfs_spin_lock(&oap->oap_lock);
3178 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3179 cfs_spin_unlock(&oap->oap_lock);
3181 if (!cfs_list_empty(&oap->oap_pending_item)) {
3182 cfs_list_del_init(&oap->oap_pending_item);
3183 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3185 loi_list_maint(cli, loi);
3186 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3188 client_obd_list_unlock(&cli->cl_loi_list_lock);
3192 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3193 struct ldlm_enqueue_info *einfo)
3195 void *data = einfo->ei_cbdata;
3198 LASSERT(lock != NULL);
3199 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3200 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3201 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3202 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3204 lock_res_and_lock(lock);
3205 cfs_spin_lock(&osc_ast_guard);
3207 if (lock->l_ast_data == NULL)
3208 lock->l_ast_data = data;
3209 if (lock->l_ast_data == data)
3212 cfs_spin_unlock(&osc_ast_guard);
3213 unlock_res_and_lock(lock);
3218 static int osc_set_data_with_check(struct lustre_handle *lockh,
3219 struct ldlm_enqueue_info *einfo)
3221 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3225 set = osc_set_lock_data_with_check(lock, einfo);
3226 LDLM_LOCK_PUT(lock);
3228 CERROR("lockh %p, data %p - client evicted?\n",
3229 lockh, einfo->ei_cbdata);
3233 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3234 ldlm_iterator_t replace, void *data)
3236 struct ldlm_res_id res_id;
3237 struct obd_device *obd = class_exp2obd(exp);
3239 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3240 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3244 /* find any ldlm lock of the inode in osc
3248 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3249 ldlm_iterator_t replace, void *data)
3251 struct ldlm_res_id res_id;
3252 struct obd_device *obd = class_exp2obd(exp);
3255 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3256 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3257 if (rc == LDLM_ITER_STOP)
3259 if (rc == LDLM_ITER_CONTINUE)
3264 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3265 obd_enqueue_update_f upcall, void *cookie,
3268 int intent = *flags & LDLM_FL_HAS_INTENT;
3272 /* The request was created before ldlm_cli_enqueue call. */
3273 if (rc == ELDLM_LOCK_ABORTED) {
3274 struct ldlm_reply *rep;
3275 rep = req_capsule_server_get(&req->rq_pill,
3278 LASSERT(rep != NULL);
3279 if (rep->lock_policy_res1)
3280 rc = rep->lock_policy_res1;
3284 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3285 *flags |= LDLM_FL_LVB_READY;
3286 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3287 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3290 /* Call the update callback. */
3291 rc = (*upcall)(cookie, rc);
3295 static int osc_enqueue_interpret(const struct lu_env *env,
3296 struct ptlrpc_request *req,
3297 struct osc_enqueue_args *aa, int rc)
3299 struct ldlm_lock *lock;
3300 struct lustre_handle handle;
3303 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3304 * might be freed anytime after lock upcall has been called. */
3305 lustre_handle_copy(&handle, aa->oa_lockh);
3306 mode = aa->oa_ei->ei_mode;
3308 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3310 lock = ldlm_handle2lock(&handle);
3312 /* Take an additional reference so that a blocking AST that
3313 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3314 * to arrive after an upcall has been executed by
3315 * osc_enqueue_fini(). */
3316 ldlm_lock_addref(&handle, mode);
3318 /* Let CP AST to grant the lock first. */
3319 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3321 /* Complete obtaining the lock procedure. */
3322 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3323 mode, aa->oa_flags, aa->oa_lvb,
3324 sizeof(*aa->oa_lvb), &handle, rc);
3325 /* Complete osc stuff. */
3326 rc = osc_enqueue_fini(req, aa->oa_lvb,
3327 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3329 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3331 /* Release the lock for async request. */
3332 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3334 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3335 * not already released by
3336 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3338 ldlm_lock_decref(&handle, mode);
3340 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3341 aa->oa_lockh, req, aa);
3342 ldlm_lock_decref(&handle, mode);
3343 LDLM_LOCK_PUT(lock);
3347 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3348 struct lov_oinfo *loi, int flags,
3349 struct ost_lvb *lvb, __u32 mode, int rc)
3351 if (rc == ELDLM_OK) {
3352 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3355 LASSERT(lock != NULL);
3356 loi->loi_lvb = *lvb;
3357 tmp = loi->loi_lvb.lvb_size;
3358 /* Extend KMS up to the end of this lock and no further
3359 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3360 if (tmp > lock->l_policy_data.l_extent.end)
3361 tmp = lock->l_policy_data.l_extent.end + 1;
3362 if (tmp >= loi->loi_kms) {
3363 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3364 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3365 loi_kms_set(loi, tmp);
3367 LDLM_DEBUG(lock, "lock acquired, setting rss="
3368 LPU64"; leaving kms="LPU64", end="LPU64,
3369 loi->loi_lvb.lvb_size, loi->loi_kms,
3370 lock->l_policy_data.l_extent.end);
3372 ldlm_lock_allow_match(lock);
3373 LDLM_LOCK_PUT(lock);
3374 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3375 loi->loi_lvb = *lvb;
3376 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3377 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3381 EXPORT_SYMBOL(osc_update_enqueue);
3383 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3385 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3386 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3387 * other synchronous requests, however keeping some locks and trying to obtain
3388 * others may take a considerable amount of time in a case of ost failure; and
3389 * when other sync requests do not get released lock from a client, the client
3390 * is excluded from the cluster -- such scenarious make the life difficult, so
3391 * release locks just after they are obtained. */
3392 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3393 int *flags, ldlm_policy_data_t *policy,
3394 struct ost_lvb *lvb, int kms_valid,
3395 obd_enqueue_update_f upcall, void *cookie,
3396 struct ldlm_enqueue_info *einfo,
3397 struct lustre_handle *lockh,
3398 struct ptlrpc_request_set *rqset, int async)
3400 struct obd_device *obd = exp->exp_obd;
3401 struct ptlrpc_request *req = NULL;
3402 int intent = *flags & LDLM_FL_HAS_INTENT;
3407 /* Filesystem lock extents are extended to page boundaries so that
3408 * dealing with the page cache is a little smoother. */
3409 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3410 policy->l_extent.end |= ~CFS_PAGE_MASK;
3413 * kms is not valid when either object is completely fresh (so that no
3414 * locks are cached), or object was evicted. In the latter case cached
3415 * lock cannot be used, because it would prime inode state with
3416 * potentially stale LVB.
3421 /* Next, search for already existing extent locks that will cover us */
3422 /* If we're trying to read, we also search for an existing PW lock. The
3423 * VFS and page cache already protect us locally, so lots of readers/
3424 * writers can share a single PW lock.
3426 * There are problems with conversion deadlocks, so instead of
3427 * converting a read lock to a write lock, we'll just enqueue a new
3430 * At some point we should cancel the read lock instead of making them
3431 * send us a blocking callback, but there are problems with canceling
3432 * locks out from other users right now, too. */
3433 mode = einfo->ei_mode;
3434 if (einfo->ei_mode == LCK_PR)
3436 mode = ldlm_lock_match(obd->obd_namespace,
3437 *flags | LDLM_FL_LVB_READY, res_id,
3438 einfo->ei_type, policy, mode, lockh, 0);
3440 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3442 if (osc_set_lock_data_with_check(matched, einfo)) {
3443 /* addref the lock only if not async requests and PW
3444 * lock is matched whereas we asked for PR. */
3445 if (!rqset && einfo->ei_mode != mode)
3446 ldlm_lock_addref(lockh, LCK_PR);
3448 /* I would like to be able to ASSERT here that
3449 * rss <= kms, but I can't, for reasons which
3450 * are explained in lov_enqueue() */
3453 /* We already have a lock, and it's referenced */
3454 (*upcall)(cookie, ELDLM_OK);
3456 /* For async requests, decref the lock. */
3457 if (einfo->ei_mode != mode)
3458 ldlm_lock_decref(lockh, LCK_PW);
3460 ldlm_lock_decref(lockh, einfo->ei_mode);
3461 LDLM_LOCK_PUT(matched);
3464 ldlm_lock_decref(lockh, mode);
3465 LDLM_LOCK_PUT(matched);
3470 CFS_LIST_HEAD(cancels);
3471 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3472 &RQF_LDLM_ENQUEUE_LVB);
3476 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3478 ptlrpc_request_free(req);
3482 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3484 ptlrpc_request_set_replen(req);
3487 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3488 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3490 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3491 sizeof(*lvb), lockh, async);
3494 struct osc_enqueue_args *aa;
3495 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3496 aa = ptlrpc_req_async_args(req);
3499 aa->oa_flags = flags;
3500 aa->oa_upcall = upcall;
3501 aa->oa_cookie = cookie;
3503 aa->oa_lockh = lockh;
3505 req->rq_interpret_reply =
3506 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3507 if (rqset == PTLRPCD_SET)
3508 ptlrpcd_add_req(req, PSCOPE_OTHER);
3510 ptlrpc_set_add_req(rqset, req);
3511 } else if (intent) {
3512 ptlrpc_req_finished(req);
3517 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3519 ptlrpc_req_finished(req);
3524 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3525 struct ldlm_enqueue_info *einfo,
3526 struct ptlrpc_request_set *rqset)
3528 struct ldlm_res_id res_id;
3532 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3533 oinfo->oi_md->lsm_object_seq, &res_id);
3535 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3536 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3537 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3538 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3539 rqset, rqset != NULL);
3543 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3544 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3545 int *flags, void *data, struct lustre_handle *lockh,
3548 struct obd_device *obd = exp->exp_obd;
3549 int lflags = *flags;
3553 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3556 /* Filesystem lock extents are extended to page boundaries so that
3557 * dealing with the page cache is a little smoother */
3558 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3559 policy->l_extent.end |= ~CFS_PAGE_MASK;
3561 /* Next, search for already existing extent locks that will cover us */
3562 /* If we're trying to read, we also search for an existing PW lock. The
3563 * VFS and page cache already protect us locally, so lots of readers/
3564 * writers can share a single PW lock. */
3568 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3569 res_id, type, policy, rc, lockh, unref);
3572 if (!osc_set_data_with_check(lockh, data)) {
3573 if (!(lflags & LDLM_FL_TEST_LOCK))
3574 ldlm_lock_decref(lockh, rc);
3578 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3579 ldlm_lock_addref(lockh, LCK_PR);
3580 ldlm_lock_decref(lockh, LCK_PW);
3587 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3591 if (unlikely(mode == LCK_GROUP))
3592 ldlm_lock_decref_and_cancel(lockh, mode);
3594 ldlm_lock_decref(lockh, mode);
3599 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3600 __u32 mode, struct lustre_handle *lockh)
3603 RETURN(osc_cancel_base(lockh, mode));
3606 static int osc_cancel_unused(struct obd_export *exp,
3607 struct lov_stripe_md *lsm,
3608 ldlm_cancel_flags_t flags,
3611 struct obd_device *obd = class_exp2obd(exp);
3612 struct ldlm_res_id res_id, *resp = NULL;
3615 resp = osc_build_res_name(lsm->lsm_object_id,
3616 lsm->lsm_object_seq, &res_id);
3619 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3622 static int osc_statfs_interpret(const struct lu_env *env,
3623 struct ptlrpc_request *req,
3624 struct osc_async_args *aa, int rc)
3626 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3627 struct obd_statfs *msfs;
3632 /* The request has in fact never been sent
3633 * due to issues at a higher level (LOV).
3634 * Exit immediately since the caller is
3635 * aware of the problem and takes care
3636 * of the clean up */
3639 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3640 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3646 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3648 GOTO(out, rc = -EPROTO);
3651 /* Reinitialize the RDONLY and DEGRADED flags at the client
3652 * on each statfs, so they don't stay set permanently. */
3653 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3655 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3656 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3657 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3658 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3660 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3661 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3662 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3663 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3665 /* Add a bit of hysteresis so this flag isn't continually flapping,
3666 * and ensure that new files don't get extremely fragmented due to
3667 * only a small amount of available space in the filesystem.
3668 * We want to set the NOSPC flag when there is less than ~0.1% free
3669 * and clear it when there is at least ~0.2% free space, so:
3670 * avail < ~0.1% max max = avail + used
3671 * 1025 * avail < avail + used used = blocks - free
3672 * 1024 * avail < used
3673 * 1024 * avail < blocks - free
3674 * avail < ((blocks - free) >> 10)
3676 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3677 * lose that amount of space so in those cases we report no space left
3678 * if their is less than 1 GB left. */
3679 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3680 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3681 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3682 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3683 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3684 (msfs->os_ffree > 64) &&
3685 (msfs->os_bavail > (used << 1)))) {
3686 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3687 OSCC_FLAG_NOSPC_BLK);
3690 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3691 (msfs->os_bavail < used)))
3692 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3694 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3696 *aa->aa_oi->oi_osfs = *msfs;
3698 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3702 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3703 __u64 max_age, struct ptlrpc_request_set *rqset)
3705 struct ptlrpc_request *req;
3706 struct osc_async_args *aa;
3710 /* We could possibly pass max_age in the request (as an absolute
3711 * timestamp or a "seconds.usec ago") so the target can avoid doing
3712 * extra calls into the filesystem if that isn't necessary (e.g.
3713 * during mount that would help a bit). Having relative timestamps
3714 * is not so great if request processing is slow, while absolute
3715 * timestamps are not ideal because they need time synchronization. */
3716 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3720 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3722 ptlrpc_request_free(req);
3725 ptlrpc_request_set_replen(req);
3726 req->rq_request_portal = OST_CREATE_PORTAL;
3727 ptlrpc_at_set_req_timeout(req);
3729 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3730 /* procfs requests not want stat in wait for avoid deadlock */
3731 req->rq_no_resend = 1;
3732 req->rq_no_delay = 1;
3735 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3736 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3737 aa = ptlrpc_req_async_args(req);
3740 ptlrpc_set_add_req(rqset, req);
3744 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3745 __u64 max_age, __u32 flags)
3747 struct obd_statfs *msfs;
3748 struct ptlrpc_request *req;
3749 struct obd_import *imp = NULL;
3753 /*Since the request might also come from lprocfs, so we need
3754 *sync this with client_disconnect_export Bug15684*/
3755 cfs_down_read(&obd->u.cli.cl_sem);
3756 if (obd->u.cli.cl_import)
3757 imp = class_import_get(obd->u.cli.cl_import);
3758 cfs_up_read(&obd->u.cli.cl_sem);
3762 /* We could possibly pass max_age in the request (as an absolute
3763 * timestamp or a "seconds.usec ago") so the target can avoid doing
3764 * extra calls into the filesystem if that isn't necessary (e.g.
3765 * during mount that would help a bit). Having relative timestamps
3766 * is not so great if request processing is slow, while absolute
3767 * timestamps are not ideal because they need time synchronization. */
3768 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3770 class_import_put(imp);
3775 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3777 ptlrpc_request_free(req);
3780 ptlrpc_request_set_replen(req);
3781 req->rq_request_portal = OST_CREATE_PORTAL;
3782 ptlrpc_at_set_req_timeout(req);
3784 if (flags & OBD_STATFS_NODELAY) {
3785 /* procfs requests not want stat in wait for avoid deadlock */
3786 req->rq_no_resend = 1;
3787 req->rq_no_delay = 1;
3790 rc = ptlrpc_queue_wait(req);
3794 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3796 GOTO(out, rc = -EPROTO);
3803 ptlrpc_req_finished(req);
3807 /* Retrieve object striping information.
3809 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3810 * the maximum number of OST indices which will fit in the user buffer.
3811 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3813 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3815 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3816 struct lov_user_md_v3 lum, *lumk;
3817 struct lov_user_ost_data_v1 *lmm_objects;
3818 int rc = 0, lum_size;
3824 /* we only need the header part from user space to get lmm_magic and
3825 * lmm_stripe_count, (the header part is common to v1 and v3) */
3826 lum_size = sizeof(struct lov_user_md_v1);
3827 if (cfs_copy_from_user(&lum, lump, lum_size))
3830 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3831 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3834 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3835 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3836 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3837 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3839 /* we can use lov_mds_md_size() to compute lum_size
3840 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3841 if (lum.lmm_stripe_count > 0) {
3842 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3843 OBD_ALLOC(lumk, lum_size);
3847 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3848 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3850 lmm_objects = &(lumk->lmm_objects[0]);
3851 lmm_objects->l_object_id = lsm->lsm_object_id;
3853 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3857 lumk->lmm_object_id = lsm->lsm_object_id;
3858 lumk->lmm_object_seq = lsm->lsm_object_seq;
3859 lumk->lmm_stripe_count = 1;
3861 if (cfs_copy_to_user(lump, lumk, lum_size))
3865 OBD_FREE(lumk, lum_size);
3871 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3872 void *karg, void *uarg)
3874 struct obd_device *obd = exp->exp_obd;
3875 struct obd_ioctl_data *data = karg;
3879 if (!cfs_try_module_get(THIS_MODULE)) {
3880 CERROR("Can't get module. Is it alive?");
3884 case OBD_IOC_LOV_GET_CONFIG: {
3886 struct lov_desc *desc;
3887 struct obd_uuid uuid;
3891 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3892 GOTO(out, err = -EINVAL);
3894 data = (struct obd_ioctl_data *)buf;
3896 if (sizeof(*desc) > data->ioc_inllen1) {
3897 obd_ioctl_freedata(buf, len);
3898 GOTO(out, err = -EINVAL);
3901 if (data->ioc_inllen2 < sizeof(uuid)) {
3902 obd_ioctl_freedata(buf, len);
3903 GOTO(out, err = -EINVAL);
3906 desc = (struct lov_desc *)data->ioc_inlbuf1;
3907 desc->ld_tgt_count = 1;
3908 desc->ld_active_tgt_count = 1;
3909 desc->ld_default_stripe_count = 1;
3910 desc->ld_default_stripe_size = 0;
3911 desc->ld_default_stripe_offset = 0;
3912 desc->ld_pattern = 0;
3913 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3915 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3917 err = cfs_copy_to_user((void *)uarg, buf, len);
3920 obd_ioctl_freedata(buf, len);
3923 case LL_IOC_LOV_SETSTRIPE:
3924 err = obd_alloc_memmd(exp, karg);
3928 case LL_IOC_LOV_GETSTRIPE:
3929 err = osc_getstripe(karg, uarg);
3931 case OBD_IOC_CLIENT_RECOVER:
3932 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3937 case IOC_OSC_SET_ACTIVE:
3938 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3941 case OBD_IOC_POLL_QUOTACHECK:
3942 err = lquota_poll_check(quota_interface, exp,
3943 (struct if_quotacheck *)karg);
3945 case OBD_IOC_PING_TARGET:
3946 err = ptlrpc_obd_ping(obd);
3949 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3950 cmd, cfs_curproc_comm());
3951 GOTO(out, err = -ENOTTY);
3954 cfs_module_put(THIS_MODULE);
3958 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3959 void *key, __u32 *vallen, void *val,
3960 struct lov_stripe_md *lsm)
3963 if (!vallen || !val)
3966 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3967 __u32 *stripe = val;
3968 *vallen = sizeof(*stripe);
3971 } else if (KEY_IS(KEY_LAST_ID)) {
3972 struct ptlrpc_request *req;
3977 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3978 &RQF_OST_GET_INFO_LAST_ID);
3982 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3983 RCL_CLIENT, keylen);
3984 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3986 ptlrpc_request_free(req);
3990 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3991 memcpy(tmp, key, keylen);
3993 req->rq_no_delay = req->rq_no_resend = 1;
3994 ptlrpc_request_set_replen(req);
3995 rc = ptlrpc_queue_wait(req);
3999 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
4001 GOTO(out, rc = -EPROTO);
4003 *((obd_id *)val) = *reply;
4005 ptlrpc_req_finished(req);
4007 } else if (KEY_IS(KEY_FIEMAP)) {
4008 struct ptlrpc_request *req;
4009 struct ll_user_fiemap *reply;
4013 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4014 &RQF_OST_GET_INFO_FIEMAP);
4018 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4019 RCL_CLIENT, keylen);
4020 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4021 RCL_CLIENT, *vallen);
4022 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4023 RCL_SERVER, *vallen);
4025 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4027 ptlrpc_request_free(req);
4031 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4032 memcpy(tmp, key, keylen);
4033 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4034 memcpy(tmp, val, *vallen);
4036 ptlrpc_request_set_replen(req);
4037 rc = ptlrpc_queue_wait(req);
4041 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4043 GOTO(out1, rc = -EPROTO);
4045 memcpy(val, reply, *vallen);
4047 ptlrpc_req_finished(req);
4055 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4057 struct llog_ctxt *ctxt;
4061 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4063 rc = llog_initiator_connect(ctxt);
4064 llog_ctxt_put(ctxt);
4066 /* XXX return an error? skip setting below flags? */
4069 cfs_spin_lock(&imp->imp_lock);
4070 imp->imp_server_timeout = 1;
4071 imp->imp_pingable = 1;
4072 cfs_spin_unlock(&imp->imp_lock);
4073 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4078 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4079 struct ptlrpc_request *req,
4086 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4089 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4090 void *key, obd_count vallen, void *val,
4091 struct ptlrpc_request_set *set)
4093 struct ptlrpc_request *req;
4094 struct obd_device *obd = exp->exp_obd;
4095 struct obd_import *imp = class_exp2cliimp(exp);
4100 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4102 if (KEY_IS(KEY_NEXT_ID)) {
4104 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4106 if (vallen != sizeof(obd_id))
4111 if (vallen != sizeof(obd_id))
4114 /* avoid race between allocate new object and set next id
4115 * from ll_sync thread */
4116 cfs_spin_lock(&oscc->oscc_lock);
4117 new_val = *((obd_id*)val) + 1;
4118 if (new_val > oscc->oscc_next_id)
4119 oscc->oscc_next_id = new_val;
4120 cfs_spin_unlock(&oscc->oscc_lock);
4121 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4122 exp->exp_obd->obd_name,
4123 obd->u.cli.cl_oscc.oscc_next_id);
4128 if (KEY_IS(KEY_CHECKSUM)) {
4129 if (vallen != sizeof(int))
4131 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4135 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4136 sptlrpc_conf_client_adapt(obd);
4140 if (KEY_IS(KEY_FLUSH_CTX)) {
4141 sptlrpc_import_flush_my_ctx(imp);
4145 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4148 /* We pass all other commands directly to OST. Since nobody calls osc
4149 methods directly and everybody is supposed to go through LOV, we
4150 assume lov checked invalid values for us.
4151 The only recognised values so far are evict_by_nid and mds_conn.
4152 Even if something bad goes through, we'd get a -EINVAL from OST
4155 if (KEY_IS(KEY_GRANT_SHRINK))
4156 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4158 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4163 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4164 RCL_CLIENT, keylen);
4165 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4166 RCL_CLIENT, vallen);
4167 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4169 ptlrpc_request_free(req);
4173 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4174 memcpy(tmp, key, keylen);
4175 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4176 memcpy(tmp, val, vallen);
4178 if (KEY_IS(KEY_MDS_CONN)) {
4179 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4181 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4182 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4183 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4184 req->rq_no_delay = req->rq_no_resend = 1;
4185 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4186 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4187 struct osc_grant_args *aa;
4190 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4191 aa = ptlrpc_req_async_args(req);
4194 ptlrpc_req_finished(req);
4197 *oa = ((struct ost_body *)val)->oa;
4199 req->rq_interpret_reply = osc_shrink_grant_interpret;
4202 ptlrpc_request_set_replen(req);
4203 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4204 LASSERT(set != NULL);
4205 ptlrpc_set_add_req(set, req);
4206 ptlrpc_check_set(NULL, set);
4208 ptlrpcd_add_req(req, PSCOPE_OTHER);
4214 static struct llog_operations osc_size_repl_logops = {
4215 lop_cancel: llog_obd_repl_cancel
4218 static struct llog_operations osc_mds_ost_orig_logops;
4220 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4221 struct obd_device *tgt, struct llog_catid *catid)
4226 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4227 &catid->lci_logid, &osc_mds_ost_orig_logops);
4229 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4233 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4234 NULL, &osc_size_repl_logops);
4236 struct llog_ctxt *ctxt =
4237 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4240 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4245 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4246 obd->obd_name, tgt->obd_name, catid, rc);
4247 CERROR("logid "LPX64":0x%x\n",
4248 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4253 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4254 struct obd_device *disk_obd, int *index)
4256 struct llog_catid catid;
4257 static char name[32] = CATLIST;
4261 LASSERT(olg == &obd->obd_olg);
4263 cfs_mutex_down(&olg->olg_cat_processing);
4264 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4266 CERROR("rc: %d\n", rc);
4270 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4271 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4272 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4274 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4276 CERROR("rc: %d\n", rc);
4280 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4282 CERROR("rc: %d\n", rc);
4287 cfs_mutex_up(&olg->olg_cat_processing);
4292 static int osc_llog_finish(struct obd_device *obd, int count)
4294 struct llog_ctxt *ctxt;
4295 int rc = 0, rc2 = 0;
4298 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4300 rc = llog_cleanup(ctxt);
4302 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4304 rc2 = llog_cleanup(ctxt);
4311 static int osc_reconnect(const struct lu_env *env,
4312 struct obd_export *exp, struct obd_device *obd,
4313 struct obd_uuid *cluuid,
4314 struct obd_connect_data *data,
4317 struct client_obd *cli = &obd->u.cli;
4319 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4322 client_obd_list_lock(&cli->cl_loi_list_lock);
4323 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4324 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4325 lost_grant = cli->cl_lost_grant;
4326 cli->cl_lost_grant = 0;
4327 client_obd_list_unlock(&cli->cl_loi_list_lock);
4329 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4330 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4331 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4332 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4333 " ocd_grant: %d\n", data->ocd_connect_flags,
4334 data->ocd_version, data->ocd_grant);
4340 static int osc_disconnect(struct obd_export *exp)
4342 struct obd_device *obd = class_exp2obd(exp);
4343 struct llog_ctxt *ctxt;
4346 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4348 if (obd->u.cli.cl_conn_count == 1) {
4349 /* Flush any remaining cancel messages out to the
4351 llog_sync(ctxt, exp, 0);
4353 llog_ctxt_put(ctxt);
4355 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4359 rc = client_disconnect_export(exp);
4361 * Initially we put del_shrink_grant before disconnect_export, but it
4362 * causes the following problem if setup (connect) and cleanup
4363 * (disconnect) are tangled together.
4364 * connect p1 disconnect p2
4365 * ptlrpc_connect_import
4366 * ............... class_manual_cleanup
4369 * ptlrpc_connect_interrupt
4371 * add this client to shrink list
4373 * Bang! pinger trigger the shrink.
4374 * So the osc should be disconnected from the shrink list, after we
4375 * are sure the import has been destroyed. BUG18662
4377 if (obd->u.cli.cl_import == NULL)
4378 osc_del_shrink_grant(&obd->u.cli);
4382 static int osc_import_event(struct obd_device *obd,
4383 struct obd_import *imp,
4384 enum obd_import_event event)
4386 struct client_obd *cli;
4390 LASSERT(imp->imp_obd == obd);
4393 case IMP_EVENT_DISCON: {
4394 /* Only do this on the MDS OSC's */
4395 if (imp->imp_server_timeout) {
4396 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4398 cfs_spin_lock(&oscc->oscc_lock);
4399 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4400 cfs_spin_unlock(&oscc->oscc_lock);
4403 client_obd_list_lock(&cli->cl_loi_list_lock);
4404 cli->cl_avail_grant = 0;
4405 cli->cl_lost_grant = 0;
4406 client_obd_list_unlock(&cli->cl_loi_list_lock);
4409 case IMP_EVENT_INACTIVE: {
4410 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4413 case IMP_EVENT_INVALIDATE: {
4414 struct ldlm_namespace *ns = obd->obd_namespace;
4418 env = cl_env_get(&refcheck);
4422 client_obd_list_lock(&cli->cl_loi_list_lock);
4423 /* all pages go to failing rpcs due to the invalid
4425 osc_check_rpcs(env, cli);
4426 client_obd_list_unlock(&cli->cl_loi_list_lock);
4428 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4429 cl_env_put(env, &refcheck);
4434 case IMP_EVENT_ACTIVE: {
4435 /* Only do this on the MDS OSC's */
4436 if (imp->imp_server_timeout) {
4437 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4439 cfs_spin_lock(&oscc->oscc_lock);
4440 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4441 OSCC_FLAG_NOSPC_BLK);
4442 cfs_spin_unlock(&oscc->oscc_lock);
4444 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4447 case IMP_EVENT_OCD: {
4448 struct obd_connect_data *ocd = &imp->imp_connect_data;
4450 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4451 osc_init_grant(&obd->u.cli, ocd);
4454 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4455 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4457 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4460 case IMP_EVENT_DEACTIVATE: {
4461 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4464 case IMP_EVENT_ACTIVATE: {
4465 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4469 CERROR("Unknown import event %d\n", event);
4476 * Determine whether the lock can be canceled before replaying the lock
4477 * during recovery, see bug16774 for detailed information.
4479 * \retval zero the lock can't be canceled
4480 * \retval other ok to cancel
4482 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4484 check_res_locked(lock->l_resource);
4487 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4489 * XXX as a future improvement, we can also cancel unused write lock
4490 * if it doesn't have dirty data and active mmaps.
4492 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4493 (lock->l_granted_mode == LCK_PR ||
4494 lock->l_granted_mode == LCK_CR) &&
4495 (osc_dlm_lock_pageref(lock) == 0))
4501 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4507 rc = ptlrpcd_addref();
4511 rc = client_obd_setup(obd, lcfg);
4514 } else if ((rc = lquota_setup(quota_interface, obd)) == 0) {
4515 struct lprocfs_static_vars lvars = { 0 };
4516 struct client_obd *cli = &obd->u.cli;
4518 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4519 lprocfs_osc_init_vars(&lvars);
4520 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4521 lproc_osc_attach_seqstat(obd);
4522 sptlrpc_lprocfs_cliobd_attach(obd);
4523 ptlrpc_lprocfs_register_obd(obd);
4527 /* We need to allocate a few requests more, because
4528 brw_interpret tries to create new requests before freeing
4529 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4530 reserved, but I afraid that might be too much wasted RAM
4531 in fact, so 2 is just my guess and still should work. */
4532 cli->cl_import->imp_rq_pool =
4533 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4535 ptlrpc_add_rqs_to_pool);
4537 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4538 cfs_sema_init(&cli->cl_grant_sem, 1);
4540 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4546 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4552 case OBD_CLEANUP_EARLY: {
4553 struct obd_import *imp;
4554 imp = obd->u.cli.cl_import;
4555 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4556 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4557 ptlrpc_deactivate_import(imp);
4558 cfs_spin_lock(&imp->imp_lock);
4559 imp->imp_pingable = 0;
4560 cfs_spin_unlock(&imp->imp_lock);
4563 case OBD_CLEANUP_EXPORTS: {
4565 * for echo client, export may be on zombie list, wait for
4566 * zombie thread to cull it, because cli.cl_import will be
4567 * cleared in client_disconnect_export():
4568 * class_export_destroy() -> obd_cleanup() ->
4569 * echo_device_free() -> echo_client_cleanup() ->
4570 * obd_disconnect() -> osc_disconnect() ->
4571 * client_disconnect_export()
4573 obd_zombie_barrier();
4574 obd_cleanup_client_import(obd);
4575 ptlrpc_lprocfs_unregister_obd(obd);
4576 lprocfs_obd_cleanup(obd);
4577 rc = obd_llog_finish(obd, 0);
4579 CERROR("failed to cleanup llogging subsystems\n");
4586 int osc_cleanup(struct obd_device *obd)
4592 /* free memory of osc quota cache */
4593 lquota_cleanup(quota_interface, obd);
4595 rc = client_obd_cleanup(obd);
4601 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4603 struct lprocfs_static_vars lvars = { 0 };
4606 lprocfs_osc_init_vars(&lvars);
4608 switch (lcfg->lcfg_command) {
4610 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4620 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4622 return osc_process_config_base(obd, buf);
4625 struct obd_ops osc_obd_ops = {
4626 .o_owner = THIS_MODULE,
4627 .o_setup = osc_setup,
4628 .o_precleanup = osc_precleanup,
4629 .o_cleanup = osc_cleanup,
4630 .o_add_conn = client_import_add_conn,
4631 .o_del_conn = client_import_del_conn,
4632 .o_connect = client_connect_import,
4633 .o_reconnect = osc_reconnect,
4634 .o_disconnect = osc_disconnect,
4635 .o_statfs = osc_statfs,
4636 .o_statfs_async = osc_statfs_async,
4637 .o_packmd = osc_packmd,
4638 .o_unpackmd = osc_unpackmd,
4639 .o_precreate = osc_precreate,
4640 .o_create = osc_create,
4641 .o_create_async = osc_create_async,
4642 .o_destroy = osc_destroy,
4643 .o_getattr = osc_getattr,
4644 .o_getattr_async = osc_getattr_async,
4645 .o_setattr = osc_setattr,
4646 .o_setattr_async = osc_setattr_async,
4648 .o_punch = osc_punch,
4650 .o_enqueue = osc_enqueue,
4651 .o_change_cbdata = osc_change_cbdata,
4652 .o_find_cbdata = osc_find_cbdata,
4653 .o_cancel = osc_cancel,
4654 .o_cancel_unused = osc_cancel_unused,
4655 .o_iocontrol = osc_iocontrol,
4656 .o_get_info = osc_get_info,
4657 .o_set_info_async = osc_set_info_async,
4658 .o_import_event = osc_import_event,
4659 .o_llog_init = osc_llog_init,
4660 .o_llog_finish = osc_llog_finish,
4661 .o_process_config = osc_process_config,
4664 extern struct lu_kmem_descr osc_caches[];
4665 extern cfs_spinlock_t osc_ast_guard;
4666 extern cfs_lock_class_key_t osc_ast_guard_class;
4668 int __init osc_init(void)
4670 struct lprocfs_static_vars lvars = { 0 };
4674 /* print an address of _any_ initialized kernel symbol from this
4675 * module, to allow debugging with gdb that doesn't support data
4676 * symbols from modules.*/
4677 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4679 rc = lu_kmem_init(osc_caches);
4681 lprocfs_osc_init_vars(&lvars);
4683 cfs_request_module("lquota");
4684 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4685 lquota_init(quota_interface);
4686 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4688 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4689 LUSTRE_OSC_NAME, &osc_device_type);
4691 if (quota_interface)
4692 PORTAL_SYMBOL_PUT(osc_quota_interface);
4693 lu_kmem_fini(osc_caches);
4697 cfs_spin_lock_init(&osc_ast_guard);
4698 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4700 osc_mds_ost_orig_logops = llog_lvfs_ops;
4701 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4702 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4703 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4704 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4710 static void /*__exit*/ osc_exit(void)
4712 lu_device_type_fini(&osc_device_type);
4714 lquota_exit(quota_interface);
4715 if (quota_interface)
4716 PORTAL_SYMBOL_PUT(osc_quota_interface);
4718 class_unregister_type(LUSTRE_OSC_NAME);
4719 lu_kmem_fini(osc_caches);
4722 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4723 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4724 MODULE_LICENSE("GPL");
4726 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);