1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_async_args *aa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
363 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 struct ptlrpc_request_set *rqset)
371 struct ptlrpc_request *req;
372 struct osc_async_args *aa;
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
380 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383 ptlrpc_request_free(req);
387 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390 osc_pack_req_body(req, oinfo);
392 ptlrpc_request_set_replen(req);
394 /* do mds to ost setattr asynchronously */
396 /* Do not wait for response. */
397 ptlrpcd_add_req(req, PSCOPE_OTHER);
399 req->rq_interpret_reply =
400 (ptlrpc_interpterer_t)osc_setattr_interpret;
402 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
403 aa = ptlrpc_req_async_args(req);
406 ptlrpc_set_add_req(rqset, req);
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413 struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 struct ptlrpc_request *req;
416 struct ost_body *body;
417 struct lov_stripe_md *lsm;
426 rc = obd_alloc_memmd(exp, &lsm);
431 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433 GOTO(out, rc = -ENOMEM);
435 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437 ptlrpc_request_free(req);
441 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443 lustre_set_wire_obdo(&body->oa, oa);
445 ptlrpc_request_set_replen(req);
447 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448 oa->o_flags == OBD_FL_DELORPHAN) {
450 "delorphan from OST integration");
451 /* Don't resend the delorphan req */
452 req->rq_no_resend = req->rq_no_delay = 1;
455 rc = ptlrpc_queue_wait(req);
459 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461 GOTO(out_req, rc = -EPROTO);
463 lustre_get_wire_obdo(oa, &body->oa);
465 /* This should really be sent by the OST */
466 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
467 oa->o_valid |= OBD_MD_FLBLKSZ;
469 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470 * have valid lsm_oinfo data structs, so don't go touching that.
471 * This needs to be fixed in a big way.
473 lsm->lsm_object_id = oa->o_id;
474 lsm->lsm_object_gr = oa->o_gr;
478 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
480 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481 if (!oti->oti_logcookies)
482 oti_alloc_cookies(oti, 1);
483 *oti->oti_logcookies = oa->o_lcookie;
487 CDEBUG(D_HA, "transno: "LPD64"\n",
488 lustre_msg_get_transno(req->rq_repmsg));
490 ptlrpc_req_finished(req);
493 obd_free_memmd(exp, &lsm);
497 static int osc_punch_interpret(const struct lu_env *env,
498 struct ptlrpc_request *req,
499 struct osc_punch_args *aa, int rc)
501 struct ost_body *body;
507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509 GOTO(out, rc = -EPROTO);
511 lustre_get_wire_obdo(aa->pa_oa, &body->oa);
513 rc = aa->pa_upcall(aa->pa_cookie, rc);
517 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
518 struct obd_capa *capa,
519 obd_enqueue_update_f upcall, void *cookie,
520 struct ptlrpc_request_set *rqset)
522 struct ptlrpc_request *req;
523 struct osc_punch_args *aa;
524 struct ost_body *body;
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 osc_set_capa_size(req, &RMF_CAPA1, capa);
533 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535 ptlrpc_request_free(req);
538 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req);
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 lustre_set_wire_obdo(&body->oa, oa);
544 osc_pack_capa(req, body, capa);
546 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
550 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551 aa = ptlrpc_req_async_args(req);
553 aa->pa_upcall = upcall;
554 aa->pa_cookie = cookie;
555 if (rqset == PTLRPCD_SET)
556 ptlrpcd_add_req(req, PSCOPE_OTHER);
558 ptlrpc_set_add_req(rqset, req);
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564 struct obd_trans_info *oti,
565 struct ptlrpc_request_set *rqset)
567 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
568 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
571 oinfo->oi_cb_up, oinfo, rqset);
574 static int osc_sync(struct obd_export *exp, struct obdo *oa,
575 struct lov_stripe_md *md, obd_size start, obd_size end,
578 struct ptlrpc_request *req;
579 struct ost_body *body;
584 CDEBUG(D_INFO, "oa NULL\n");
588 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
592 osc_set_capa_size(req, &RMF_CAPA1, capa);
593 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
595 ptlrpc_request_free(req);
599 /* overload the size and blocks fields in the oa with start/end */
600 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
602 lustre_set_wire_obdo(&body->oa, oa);
603 body->oa.o_size = start;
604 body->oa.o_blocks = end;
605 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
606 osc_pack_capa(req, body, capa);
608 ptlrpc_request_set_replen(req);
610 rc = ptlrpc_queue_wait(req);
614 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
616 GOTO(out, rc = -EPROTO);
618 lustre_get_wire_obdo(oa, &body->oa);
622 ptlrpc_req_finished(req);
626 /* Find and cancel locally locks matched by @mode in the resource found by
627 * @objid. Found locks are added into @cancel list. Returns the amount of
628 * locks added to @cancels list. */
629 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
631 ldlm_mode_t mode, int lock_flags)
633 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
634 struct ldlm_res_id res_id;
635 struct ldlm_resource *res;
639 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
640 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
644 LDLM_RESOURCE_ADDREF(res);
645 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
646 lock_flags, 0, NULL);
647 LDLM_RESOURCE_DELREF(res);
648 ldlm_resource_putref(res);
652 static int osc_destroy_interpret(const struct lu_env *env,
653 struct ptlrpc_request *req, void *data,
656 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
658 cfs_atomic_dec(&cli->cl_destroy_in_flight);
659 cfs_waitq_signal(&cli->cl_destroy_waitq);
663 static int osc_can_send_destroy(struct client_obd *cli)
665 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
666 cli->cl_max_rpcs_in_flight) {
667 /* The destroy request can be sent */
670 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
671 cli->cl_max_rpcs_in_flight) {
673 * The counter has been modified between the two atomic
676 cfs_waitq_signal(&cli->cl_destroy_waitq);
681 /* Destroy requests can be async always on the client, and we don't even really
682 * care about the return code since the client cannot do anything at all about
684 * When the MDS is unlinking a filename, it saves the file objects into a
685 * recovery llog, and these object records are cancelled when the OST reports
686 * they were destroyed and sync'd to disk (i.e. transaction committed).
687 * If the client dies, or the OST is down when the object should be destroyed,
688 * the records are not cancelled, and when the OST reconnects to the MDS next,
689 * it will retrieve the llog unlink logs and then sends the log cancellation
690 * cookies to the MDS after committing destroy transactions. */
691 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
692 struct lov_stripe_md *ea, struct obd_trans_info *oti,
693 struct obd_export *md_export, void *capa)
695 struct client_obd *cli = &exp->exp_obd->u.cli;
696 struct ptlrpc_request *req;
697 struct ost_body *body;
698 CFS_LIST_HEAD(cancels);
703 CDEBUG(D_INFO, "oa NULL\n");
707 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
708 LDLM_FL_DISCARD_DATA);
710 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
712 ldlm_lock_list_put(&cancels, l_bl_ast, count);
716 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
717 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
720 ptlrpc_request_free(req);
724 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
725 ptlrpc_at_set_req_timeout(req);
727 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
728 oa->o_lcookie = *oti->oti_logcookies;
729 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
731 lustre_set_wire_obdo(&body->oa, oa);
733 osc_pack_capa(req, body, (struct obd_capa *)capa);
734 ptlrpc_request_set_replen(req);
736 /* don't throttle destroy RPCs for the MDT */
737 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
738 req->rq_interpret_reply = osc_destroy_interpret;
739 if (!osc_can_send_destroy(cli)) {
740 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
744 * Wait until the number of on-going destroy RPCs drops
745 * under max_rpc_in_flight
747 l_wait_event_exclusive(cli->cl_destroy_waitq,
748 osc_can_send_destroy(cli), &lwi);
752 /* Do not wait for response */
753 ptlrpcd_add_req(req, PSCOPE_OTHER);
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
760 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
762 LASSERT(!(oa->o_valid & bits));
765 client_obd_list_lock(&cli->cl_loi_list_lock);
766 oa->o_dirty = cli->cl_dirty;
767 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
771 } else if (cfs_atomic_read(&obd_dirty_pages) -
772 cfs_atomic_read(&obd_dirty_transit_pages) >
773 obd_max_dirty_pages + 1){
774 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
775 * not covered by a lock thus they may safely race and trip
776 * this CERROR() unless we add in a small fudge factor (+1). */
777 CERROR("dirty %d - %d > system dirty_max %d\n",
778 cfs_atomic_read(&obd_dirty_pages),
779 cfs_atomic_read(&obd_dirty_transit_pages),
780 obd_max_dirty_pages);
782 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
783 CERROR("dirty %lu - dirty_max %lu too big???\n",
784 cli->cl_dirty, cli->cl_dirty_max);
787 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
788 (cli->cl_max_rpcs_in_flight + 1);
789 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791 oa->o_grant = cli->cl_avail_grant;
792 oa->o_dropped = cli->cl_lost_grant;
793 cli->cl_lost_grant = 0;
794 client_obd_list_unlock(&cli->cl_loi_list_lock);
795 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
796 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
800 static void osc_update_next_shrink(struct client_obd *cli)
802 cli->cl_next_shrink_grant =
803 cfs_time_shift(cli->cl_grant_shrink_interval);
804 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
805 cli->cl_next_shrink_grant);
808 /* caller must hold loi_list_lock */
809 static void osc_consume_write_grant(struct client_obd *cli,
810 struct brw_page *pga)
812 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
813 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814 cfs_atomic_inc(&obd_dirty_pages);
815 cli->cl_dirty += CFS_PAGE_SIZE;
816 cli->cl_avail_grant -= CFS_PAGE_SIZE;
817 pga->flag |= OBD_BRW_FROM_GRANT;
818 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819 CFS_PAGE_SIZE, pga, pga->pg);
820 LASSERT(cli->cl_avail_grant >= 0);
821 osc_update_next_shrink(cli);
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825 * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827 struct brw_page *pga, int sent)
829 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
833 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
838 pga->flag &= ~OBD_BRW_FROM_GRANT;
839 cfs_atomic_dec(&obd_dirty_pages);
840 cli->cl_dirty -= CFS_PAGE_SIZE;
841 if (pga->flag & OBD_BRW_NOCACHE) {
842 pga->flag &= ~OBD_BRW_NOCACHE;
843 cfs_atomic_dec(&obd_dirty_transit_pages);
844 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
847 cli->cl_lost_grant += CFS_PAGE_SIZE;
848 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
849 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
850 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
851 /* For short writes we shouldn't count parts of pages that
852 * span a whole block on the OST side, or our accounting goes
853 * wrong. Should match the code in filter_grant_check. */
854 int offset = pga->off & ~CFS_PAGE_MASK;
855 int count = pga->count + (offset & (blocksize - 1));
856 int end = (offset + pga->count) & (blocksize - 1);
858 count += blocksize - end;
860 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
861 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
862 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
863 cli->cl_avail_grant, cli->cl_dirty);
869 static unsigned long rpcs_in_flight(struct client_obd *cli)
871 return cli->cl_r_in_flight + cli->cl_w_in_flight;
874 /* caller must hold loi_list_lock */
875 void osc_wake_cache_waiters(struct client_obd *cli)
878 struct osc_cache_waiter *ocw;
881 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
882 /* if we can't dirty more, we must wait until some is written */
883 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
884 (cfs_atomic_read(&obd_dirty_pages) + 1 >
885 obd_max_dirty_pages)) {
886 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887 "osc max %ld, sys max %d\n", cli->cl_dirty,
888 cli->cl_dirty_max, obd_max_dirty_pages);
892 /* if still dirty cache but no grant wait for pending RPCs that
893 * may yet return us some grant before doing sync writes */
894 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896 cli->cl_w_in_flight);
900 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
901 cfs_list_del_init(&ocw->ocw_entry);
902 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903 /* no more RPCs in flight to return grant, do sync IO */
904 ocw->ocw_rc = -EDQUOT;
905 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
907 osc_consume_write_grant(cli,
908 &ocw->ocw_oap->oap_brw_page);
911 cfs_waitq_signal(&ocw->ocw_waitq);
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
919 client_obd_list_lock(&cli->cl_loi_list_lock);
920 cli->cl_avail_grant += grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
926 if (body->oa.o_valid & OBD_MD_FLGRANT) {
927 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928 __osc_update_grant(cli, body->oa.o_grant);
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933 void *key, obd_count vallen, void *val,
934 struct ptlrpc_request_set *set);
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937 struct ptlrpc_request *req,
940 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942 struct ost_body *body;
945 __osc_update_grant(cli, oa->o_grant);
949 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
951 osc_update_grant(cli, body);
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
959 client_obd_list_lock(&cli->cl_loi_list_lock);
960 oa->o_grant = cli->cl_avail_grant / 4;
961 cli->cl_avail_grant -= oa->o_grant;
962 client_obd_list_unlock(&cli->cl_loi_list_lock);
963 oa->o_flags |= OBD_FL_SHRINK_GRANT;
964 osc_update_next_shrink(cli);
967 /* Shrink the current grant, either from some large amount to enough for a
968 * full set of in-flight RPCs, or if we have already shrunk to that limit
969 * then to enough for a single RPC. This avoids keeping more grant than
970 * needed, and avoids shrinking the grant piecemeal. */
971 static int osc_shrink_grant(struct client_obd *cli)
973 long target = (cli->cl_max_rpcs_in_flight + 1) *
974 cli->cl_max_pages_per_rpc;
976 client_obd_list_lock(&cli->cl_loi_list_lock);
977 if (cli->cl_avail_grant <= target)
978 target = cli->cl_max_pages_per_rpc;
979 client_obd_list_unlock(&cli->cl_loi_list_lock);
981 return osc_shrink_grant_to_target(cli, target);
984 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
987 struct ost_body *body;
990 client_obd_list_lock(&cli->cl_loi_list_lock);
991 /* Don't shrink if we are already above or below the desired limit
992 * We don't want to shrink below a single RPC, as that will negatively
993 * impact block allocation and long-term performance. */
994 if (target < cli->cl_max_pages_per_rpc)
995 target = cli->cl_max_pages_per_rpc;
997 if (target >= cli->cl_avail_grant) {
998 client_obd_list_unlock(&cli->cl_loi_list_lock);
1001 client_obd_list_unlock(&cli->cl_loi_list_lock);
1003 OBD_ALLOC_PTR(body);
1007 osc_announce_cached(cli, &body->oa, 0);
1009 client_obd_list_lock(&cli->cl_loi_list_lock);
1010 body->oa.o_grant = cli->cl_avail_grant - target;
1011 cli->cl_avail_grant = target;
1012 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1014 osc_update_next_shrink(cli);
1016 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1017 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1018 sizeof(*body), body, NULL);
1020 __osc_update_grant(cli, body->oa.o_grant);
1025 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1026 static int osc_should_shrink_grant(struct client_obd *client)
1028 cfs_time_t time = cfs_time_current();
1029 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1030 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1031 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1032 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1035 osc_update_next_shrink(client);
1040 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1042 struct client_obd *client;
1044 cfs_list_for_each_entry(client, &item->ti_obd_list,
1045 cl_grant_shrink_list) {
1046 if (osc_should_shrink_grant(client))
1047 osc_shrink_grant(client);
1052 static int osc_add_shrink_grant(struct client_obd *client)
1056 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1058 osc_grant_shrink_grant_cb, NULL,
1059 &client->cl_grant_shrink_list);
1061 CERROR("add grant client %s error %d\n",
1062 client->cl_import->imp_obd->obd_name, rc);
1065 CDEBUG(D_CACHE, "add grant client %s \n",
1066 client->cl_import->imp_obd->obd_name);
1067 osc_update_next_shrink(client);
1071 static int osc_del_shrink_grant(struct client_obd *client)
1073 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1080 * ocd_grant is the total grant amount we're expect to hold: if we've
1081 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1082 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1084 * race is tolerable here: if we're evicted, but imp_state already
1085 * left EVICTED state, then cl_dirty must be 0 already.
1087 client_obd_list_lock(&cli->cl_loi_list_lock);
1088 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1089 cli->cl_avail_grant = ocd->ocd_grant;
1091 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1092 client_obd_list_unlock(&cli->cl_loi_list_lock);
1094 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1095 cli->cl_avail_grant, cli->cl_lost_grant);
1096 LASSERT(cli->cl_avail_grant >= 0);
1098 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1099 cfs_list_empty(&cli->cl_grant_shrink_list))
1100 osc_add_shrink_grant(cli);
1103 /* We assume that the reason this OSC got a short read is because it read
1104 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1105 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1106 * this stripe never got written at or beyond this stripe offset yet. */
1107 static void handle_short_read(int nob_read, obd_count page_count,
1108 struct brw_page **pga)
1113 /* skip bytes read OK */
1114 while (nob_read > 0) {
1115 LASSERT (page_count > 0);
1117 if (pga[i]->count > nob_read) {
1118 /* EOF inside this page */
1119 ptr = cfs_kmap(pga[i]->pg) +
1120 (pga[i]->off & ~CFS_PAGE_MASK);
1121 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1122 cfs_kunmap(pga[i]->pg);
1128 nob_read -= pga[i]->count;
1133 /* zero remaining pages */
1134 while (page_count-- > 0) {
1135 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1136 memset(ptr, 0, pga[i]->count);
1137 cfs_kunmap(pga[i]->pg);
1142 static int check_write_rcs(struct ptlrpc_request *req,
1143 int requested_nob, int niocount,
1144 obd_count page_count, struct brw_page **pga)
1149 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1150 sizeof(*remote_rcs) *
1152 if (remote_rcs == NULL) {
1153 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1157 /* return error if any niobuf was in error */
1158 for (i = 0; i < niocount; i++) {
1159 if (remote_rcs[i] < 0)
1160 return(remote_rcs[i]);
1162 if (remote_rcs[i] != 0) {
1163 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1164 i, remote_rcs[i], req);
1169 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1170 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1171 req->rq_bulk->bd_nob_transferred, requested_nob);
1178 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1180 if (p1->flag != p2->flag) {
1181 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1182 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1184 /* warn if we try to combine flags that we don't know to be
1185 * safe to combine */
1186 if ((p1->flag & mask) != (p2->flag & mask))
1187 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1188 "same brw?\n", p1->flag, p2->flag);
1192 return (p1->off + p1->count == p2->off);
1195 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1196 struct brw_page **pga, int opc,
1197 cksum_type_t cksum_type)
1202 LASSERT (pg_count > 0);
1203 cksum = init_checksum(cksum_type);
1204 while (nob > 0 && pg_count > 0) {
1205 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1206 int off = pga[i]->off & ~CFS_PAGE_MASK;
1207 int count = pga[i]->count > nob ? nob : pga[i]->count;
1209 /* corrupt the data before we compute the checksum, to
1210 * simulate an OST->client data error */
1211 if (i == 0 && opc == OST_READ &&
1212 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1213 memcpy(ptr + off, "bad1", min(4, nob));
1214 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1215 cfs_kunmap(pga[i]->pg);
1216 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1219 nob -= pga[i]->count;
1223 /* For sending we only compute the wrong checksum instead
1224 * of corrupting the data so it is still correct on a redo */
1225 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1232 struct lov_stripe_md *lsm, obd_count page_count,
1233 struct brw_page **pga,
1234 struct ptlrpc_request **reqp,
1235 struct obd_capa *ocapa, int reserve)
1237 struct ptlrpc_request *req;
1238 struct ptlrpc_bulk_desc *desc;
1239 struct ost_body *body;
1240 struct obd_ioobj *ioobj;
1241 struct niobuf_remote *niobuf;
1242 int niocount, i, requested_nob, opc, rc;
1243 struct osc_brw_async_args *aa;
1244 struct req_capsule *pill;
1245 struct brw_page *pg_prev;
1248 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1249 RETURN(-ENOMEM); /* Recoverable */
1250 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1251 RETURN(-EINVAL); /* Fatal */
1253 if ((cmd & OBD_BRW_WRITE) != 0) {
1255 req = ptlrpc_request_alloc_pool(cli->cl_import,
1256 cli->cl_import->imp_rq_pool,
1260 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1265 for (niocount = i = 1; i < page_count; i++) {
1266 if (!can_merge_pages(pga[i - 1], pga[i]))
1270 pill = &req->rq_pill;
1271 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1273 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1274 niocount * sizeof(*niobuf));
1275 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1279 ptlrpc_request_free(req);
1282 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1283 ptlrpc_at_set_req_timeout(req);
1285 if (opc == OST_WRITE)
1286 desc = ptlrpc_prep_bulk_imp(req, page_count,
1287 BULK_GET_SOURCE, OST_BULK_PORTAL);
1289 desc = ptlrpc_prep_bulk_imp(req, page_count,
1290 BULK_PUT_SINK, OST_BULK_PORTAL);
1293 GOTO(out, rc = -ENOMEM);
1294 /* NB request now owns desc and will free it when it gets freed */
1296 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1297 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1298 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1299 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1301 lustre_set_wire_obdo(&body->oa, oa);
1303 obdo_to_ioobj(oa, ioobj);
1304 ioobj->ioo_bufcnt = niocount;
1305 osc_pack_capa(req, body, ocapa);
1306 LASSERT (page_count > 0);
1308 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309 struct brw_page *pg = pga[i];
1311 LASSERT(pg->count > 0);
1312 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1313 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1314 pg->off, pg->count);
1316 LASSERTF(i == 0 || pg->off > pg_prev->off,
1317 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1318 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1320 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1321 pg_prev->pg, page_private(pg_prev->pg),
1322 pg_prev->pg->index, pg_prev->off);
1324 LASSERTF(i == 0 || pg->off > pg_prev->off,
1325 "i %d p_c %u\n", i, page_count);
1327 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1328 (pg->flag & OBD_BRW_SRVLOCK));
1330 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1332 requested_nob += pg->count;
1334 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1336 niobuf->len += pg->count;
1338 niobuf->offset = pg->off;
1339 niobuf->len = pg->count;
1340 niobuf->flags = pg->flag;
1345 LASSERTF((void *)(niobuf - niocount) ==
1346 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1350 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1351 if (osc_should_shrink_grant(cli))
1352 osc_shrink_grant_local(cli, &body->oa);
1354 /* size[REQ_REC_OFF] still sizeof (*body) */
1355 if (opc == OST_WRITE) {
1356 if (unlikely(cli->cl_checksum) &&
1357 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1358 /* store cl_cksum_type in a local variable since
1359 * it can be changed via lprocfs */
1360 cksum_type_t cksum_type = cli->cl_cksum_type;
1362 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1363 oa->o_flags &= OBD_FL_LOCAL_MASK;
1364 body->oa.o_flags = 0;
1366 body->oa.o_flags |= cksum_type_pack(cksum_type);
1367 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1368 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1372 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1374 /* save this in 'oa', too, for later checking */
1375 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376 oa->o_flags |= cksum_type_pack(cksum_type);
1378 /* clear out the checksum flag, in case this is a
1379 * resend but cl_checksum is no longer set. b=11238 */
1380 oa->o_valid &= ~OBD_MD_FLCKSUM;
1382 oa->o_cksum = body->oa.o_cksum;
1383 /* 1 RC per niobuf */
1384 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1385 sizeof(__u32) * niocount);
1387 if (unlikely(cli->cl_checksum) &&
1388 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1390 body->oa.o_flags = 0;
1391 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1392 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 0);
1395 /* 1 RC for the whole I/O */
1397 ptlrpc_request_set_replen(req);
1399 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1400 aa = ptlrpc_req_async_args(req);
1402 aa->aa_requested_nob = requested_nob;
1403 aa->aa_nio_count = niocount;
1404 aa->aa_page_count = page_count;
1408 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1409 if (ocapa && reserve)
1410 aa->aa_ocapa = capa_get(ocapa);
1416 ptlrpc_req_finished(req);
1420 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1421 __u32 client_cksum, __u32 server_cksum, int nob,
1422 obd_count page_count, struct brw_page **pga,
1423 cksum_type_t client_cksum_type)
1427 cksum_type_t cksum_type;
1429 if (server_cksum == client_cksum) {
1430 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1434 if (oa->o_valid & OBD_MD_FLFLAGS)
1435 cksum_type = cksum_type_unpack(oa->o_flags);
1437 cksum_type = OBD_CKSUM_CRC32;
1439 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1442 if (cksum_type != client_cksum_type)
1443 msg = "the server did not use the checksum type specified in "
1444 "the original request - likely a protocol problem";
1445 else if (new_cksum == server_cksum)
1446 msg = "changed on the client after we checksummed it - "
1447 "likely false positive due to mmap IO (bug 11742)";
1448 else if (new_cksum == client_cksum)
1449 msg = "changed in transit before arrival at OST";
1451 msg = "changed in transit AND doesn't match the original - "
1452 "likely false positive due to mmap IO (bug 11742)";
1454 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1455 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1456 "["LPU64"-"LPU64"]\n",
1457 msg, libcfs_nid2str(peer->nid),
1458 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1459 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1462 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1464 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1465 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1466 "client csum now %x\n", client_cksum, client_cksum_type,
1467 server_cksum, cksum_type, new_cksum);
1471 /* Note rc enters this function as number of bytes transferred */
1472 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1474 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1475 const lnet_process_id_t *peer =
1476 &req->rq_import->imp_connection->c_peer;
1477 struct client_obd *cli = aa->aa_cli;
1478 struct ost_body *body;
1479 __u32 client_cksum = 0;
1482 if (rc < 0 && rc != -EDQUOT)
1485 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1486 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1488 CDEBUG(D_INFO, "Can't unpack body\n");
1492 /* set/clear over quota flag for a uid/gid */
1493 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1494 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1495 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1497 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1504 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1505 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1507 osc_update_grant(cli, body);
1509 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1511 CERROR("Unexpected +ve rc %d\n", rc);
1514 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1516 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1519 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520 check_write_checksum(&body->oa, peer, client_cksum,
1521 body->oa.o_cksum, aa->aa_requested_nob,
1522 aa->aa_page_count, aa->aa_ppga,
1523 cksum_type_unpack(aa->aa_oa->o_flags)))
1526 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1527 aa->aa_page_count, aa->aa_ppga);
1531 /* The rest of this function executes only for OST_READs */
1533 /* if unwrap_bulk failed, return -EAGAIN to retry */
1534 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1536 GOTO(out, rc = -EAGAIN);
1538 if (rc > aa->aa_requested_nob) {
1539 CERROR("Unexpected rc %d (%d requested)\n", rc,
1540 aa->aa_requested_nob);
1544 if (rc != req->rq_bulk->bd_nob_transferred) {
1545 CERROR ("Unexpected rc %d (%d transferred)\n",
1546 rc, req->rq_bulk->bd_nob_transferred);
1550 if (rc < aa->aa_requested_nob)
1551 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1553 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1554 static int cksum_counter;
1555 __u32 server_cksum = body->oa.o_cksum;
1558 cksum_type_t cksum_type;
1560 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1561 cksum_type = cksum_type_unpack(body->oa.o_flags);
1563 cksum_type = OBD_CKSUM_CRC32;
1564 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1565 aa->aa_ppga, OST_READ,
1568 if (peer->nid == req->rq_bulk->bd_sender) {
1572 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1575 if (server_cksum == ~0 && rc > 0) {
1576 CERROR("Protocol error: server %s set the 'checksum' "
1577 "bit, but didn't send a checksum. Not fatal, "
1578 "but please notify on http://bugzilla.lustre.org/\n",
1579 libcfs_nid2str(peer->nid));
1580 } else if (server_cksum != client_cksum) {
1581 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1582 "%s%s%s inum "LPU64"/"LPU64" object "
1583 LPU64"/"LPU64" extent "
1584 "["LPU64"-"LPU64"]\n",
1585 req->rq_import->imp_obd->obd_name,
1586 libcfs_nid2str(peer->nid),
1588 body->oa.o_valid & OBD_MD_FLFID ?
1589 body->oa.o_fid : (__u64)0,
1590 body->oa.o_valid & OBD_MD_FLFID ?
1591 body->oa.o_generation :(__u64)0,
1593 body->oa.o_valid & OBD_MD_FLGROUP ?
1594 body->oa.o_gr : (__u64)0,
1595 aa->aa_ppga[0]->off,
1596 aa->aa_ppga[aa->aa_page_count-1]->off +
1597 aa->aa_ppga[aa->aa_page_count-1]->count -
1599 CERROR("client %x, server %x, cksum_type %x\n",
1600 client_cksum, server_cksum, cksum_type);
1602 aa->aa_oa->o_cksum = client_cksum;
1606 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1609 } else if (unlikely(client_cksum)) {
1610 static int cksum_missed;
1613 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1614 CERROR("Checksum %u requested from %s but not sent\n",
1615 cksum_missed, libcfs_nid2str(peer->nid));
1621 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1626 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1627 struct lov_stripe_md *lsm,
1628 obd_count page_count, struct brw_page **pga,
1629 struct obd_capa *ocapa)
1631 struct ptlrpc_request *req;
1635 struct l_wait_info lwi;
1639 cfs_waitq_init(&waitq);
1642 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1643 page_count, pga, &req, ocapa, 0);
1647 rc = ptlrpc_queue_wait(req);
1649 if (rc == -ETIMEDOUT && req->rq_resend) {
1650 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1651 ptlrpc_req_finished(req);
1655 rc = osc_brw_fini_request(req, rc);
1657 ptlrpc_req_finished(req);
1658 if (osc_recoverable_error(rc)) {
1660 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1661 CERROR("too many resend retries, returning error\n");
1665 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1666 l_wait_event(waitq, 0, &lwi);
1674 int osc_brw_redo_request(struct ptlrpc_request *request,
1675 struct osc_brw_async_args *aa)
1677 struct ptlrpc_request *new_req;
1678 struct ptlrpc_request_set *set = request->rq_set;
1679 struct osc_brw_async_args *new_aa;
1680 struct osc_async_page *oap;
1684 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1685 CERROR("too many resend retries, returning error\n");
1689 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1691 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1692 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1693 aa->aa_cli, aa->aa_oa,
1694 NULL /* lsm unused by osc currently */,
1695 aa->aa_page_count, aa->aa_ppga,
1696 &new_req, aa->aa_ocapa, 0);
1700 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1702 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1703 if (oap->oap_request != NULL) {
1704 LASSERTF(request == oap->oap_request,
1705 "request %p != oap_request %p\n",
1706 request, oap->oap_request);
1707 if (oap->oap_interrupted) {
1708 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1709 ptlrpc_req_finished(new_req);
1714 /* New request takes over pga and oaps from old request.
1715 * Note that copying a list_head doesn't work, need to move it... */
1717 new_req->rq_interpret_reply = request->rq_interpret_reply;
1718 new_req->rq_async_args = request->rq_async_args;
1719 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1721 new_aa = ptlrpc_req_async_args(new_req);
1723 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1724 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1725 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1727 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1728 if (oap->oap_request) {
1729 ptlrpc_req_finished(oap->oap_request);
1730 oap->oap_request = ptlrpc_request_addref(new_req);
1734 new_aa->aa_ocapa = aa->aa_ocapa;
1735 aa->aa_ocapa = NULL;
1737 /* use ptlrpc_set_add_req is safe because interpret functions work
1738 * in check_set context. only one way exist with access to request
1739 * from different thread got -EINTR - this way protected with
1740 * cl_loi_list_lock */
1741 ptlrpc_set_add_req(set, new_req);
1743 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1745 DEBUG_REQ(D_INFO, new_req, "new request");
1750 * ugh, we want disk allocation on the target to happen in offset order. we'll
1751 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1752 * fine for our small page arrays and doesn't require allocation. its an
1753 * insertion sort that swaps elements that are strides apart, shrinking the
1754 * stride down until its '1' and the array is sorted.
1756 static void sort_brw_pages(struct brw_page **array, int num)
1759 struct brw_page *tmp;
1763 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1768 for (i = stride ; i < num ; i++) {
1771 while (j >= stride && array[j - stride]->off > tmp->off) {
1772 array[j] = array[j - stride];
1777 } while (stride > 1);
1780 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1786 LASSERT (pages > 0);
1787 offset = pg[i]->off & ~CFS_PAGE_MASK;
1791 if (pages == 0) /* that's all */
1794 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1795 return count; /* doesn't end on page boundary */
1798 offset = pg[i]->off & ~CFS_PAGE_MASK;
1799 if (offset != 0) /* doesn't start on page boundary */
1806 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1808 struct brw_page **ppga;
1811 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1815 for (i = 0; i < count; i++)
1820 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1822 LASSERT(ppga != NULL);
1823 OBD_FREE(ppga, sizeof(*ppga) * count);
1826 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1827 obd_count page_count, struct brw_page *pga,
1828 struct obd_trans_info *oti)
1830 struct obdo *saved_oa = NULL;
1831 struct brw_page **ppga, **orig;
1832 struct obd_import *imp = class_exp2cliimp(exp);
1833 struct client_obd *cli;
1834 int rc, page_count_orig;
1837 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1838 cli = &imp->imp_obd->u.cli;
1840 if (cmd & OBD_BRW_CHECK) {
1841 /* The caller just wants to know if there's a chance that this
1842 * I/O can succeed */
1844 if (imp->imp_invalid)
1849 /* test_brw with a failed create can trip this, maybe others. */
1850 LASSERT(cli->cl_max_pages_per_rpc);
1854 orig = ppga = osc_build_ppga(pga, page_count);
1857 page_count_orig = page_count;
1859 sort_brw_pages(ppga, page_count);
1860 while (page_count) {
1861 obd_count pages_per_brw;
1863 if (page_count > cli->cl_max_pages_per_rpc)
1864 pages_per_brw = cli->cl_max_pages_per_rpc;
1866 pages_per_brw = page_count;
1868 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1870 if (saved_oa != NULL) {
1871 /* restore previously saved oa */
1872 *oinfo->oi_oa = *saved_oa;
1873 } else if (page_count > pages_per_brw) {
1874 /* save a copy of oa (brw will clobber it) */
1875 OBDO_ALLOC(saved_oa);
1876 if (saved_oa == NULL)
1877 GOTO(out, rc = -ENOMEM);
1878 *saved_oa = *oinfo->oi_oa;
1881 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1882 pages_per_brw, ppga, oinfo->oi_capa);
1887 page_count -= pages_per_brw;
1888 ppga += pages_per_brw;
1892 osc_release_ppga(orig, page_count_orig);
1894 if (saved_oa != NULL)
1895 OBDO_FREE(saved_oa);
1900 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1901 * the dirty accounting. Writeback completes or truncate happens before
1902 * writing starts. Must be called with the loi lock held. */
1903 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1906 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1910 /* This maintains the lists of pending pages to read/write for a given object
1911 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1912 * to quickly find objects that are ready to send an RPC. */
1913 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1919 if (lop->lop_num_pending == 0)
1922 /* if we have an invalid import we want to drain the queued pages
1923 * by forcing them through rpcs that immediately fail and complete
1924 * the pages. recovery relies on this to empty the queued pages
1925 * before canceling the locks and evicting down the llite pages */
1926 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1929 /* stream rpcs in queue order as long as as there is an urgent page
1930 * queued. this is our cheap solution for good batching in the case
1931 * where writepage marks some random page in the middle of the file
1932 * as urgent because of, say, memory pressure */
1933 if (!cfs_list_empty(&lop->lop_urgent)) {
1934 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1937 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1938 optimal = cli->cl_max_pages_per_rpc;
1939 if (cmd & OBD_BRW_WRITE) {
1940 /* trigger a write rpc stream as long as there are dirtiers
1941 * waiting for space. as they're waiting, they're not going to
1942 * create more pages to coallesce with what's waiting.. */
1943 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1944 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1947 /* +16 to avoid triggering rpcs that would want to include pages
1948 * that are being queued but which can't be made ready until
1949 * the queuer finishes with the page. this is a wart for
1950 * llite::commit_write() */
1953 if (lop->lop_num_pending >= optimal)
1959 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1961 struct osc_async_page *oap;
1964 if (cfs_list_empty(&lop->lop_urgent))
1967 oap = cfs_list_entry(lop->lop_urgent.next,
1968 struct osc_async_page, oap_urgent_item);
1970 if (oap->oap_async_flags & ASYNC_HP) {
1971 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1978 static void on_list(cfs_list_t *item, cfs_list_t *list,
1981 if (cfs_list_empty(item) && should_be_on)
1982 cfs_list_add_tail(item, list);
1983 else if (!cfs_list_empty(item) && !should_be_on)
1984 cfs_list_del_init(item);
1987 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1988 * can find pages to build into rpcs quickly */
1989 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1991 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1992 lop_makes_hprpc(&loi->loi_read_lop)) {
1994 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1995 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1997 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1998 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1999 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2000 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2003 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2004 loi->loi_write_lop.lop_num_pending);
2006 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2007 loi->loi_read_lop.lop_num_pending);
2010 static void lop_update_pending(struct client_obd *cli,
2011 struct loi_oap_pages *lop, int cmd, int delta)
2013 lop->lop_num_pending += delta;
2014 if (cmd & OBD_BRW_WRITE)
2015 cli->cl_pending_w_pages += delta;
2017 cli->cl_pending_r_pages += delta;
2021 * this is called when a sync waiter receives an interruption. Its job is to
2022 * get the caller woken as soon as possible. If its page hasn't been put in an
2023 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2024 * desiring interruption which will forcefully complete the rpc once the rpc
2027 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2029 struct loi_oap_pages *lop;
2030 struct lov_oinfo *loi;
2034 LASSERT(!oap->oap_interrupted);
2035 oap->oap_interrupted = 1;
2037 /* ok, it's been put in an rpc. only one oap gets a request reference */
2038 if (oap->oap_request != NULL) {
2039 ptlrpc_mark_interrupted(oap->oap_request);
2040 ptlrpcd_wake(oap->oap_request);
2041 ptlrpc_req_finished(oap->oap_request);
2042 oap->oap_request = NULL;
2046 * page completion may be called only if ->cpo_prep() method was
2047 * executed by osc_io_submit(), that also adds page the to pending list
2049 if (!cfs_list_empty(&oap->oap_pending_item)) {
2050 cfs_list_del_init(&oap->oap_pending_item);
2051 cfs_list_del_init(&oap->oap_urgent_item);
2054 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2055 &loi->loi_write_lop : &loi->loi_read_lop;
2056 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2057 loi_list_maint(oap->oap_cli, oap->oap_loi);
2058 rc = oap->oap_caller_ops->ap_completion(env,
2059 oap->oap_caller_data,
2060 oap->oap_cmd, NULL, -EINTR);
2066 /* this is trying to propogate async writeback errors back up to the
2067 * application. As an async write fails we record the error code for later if
2068 * the app does an fsync. As long as errors persist we force future rpcs to be
2069 * sync so that the app can get a sync error and break the cycle of queueing
2070 * pages for which writeback will fail. */
2071 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2078 ar->ar_force_sync = 1;
2079 ar->ar_min_xid = ptlrpc_sample_next_xid();
2084 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2085 ar->ar_force_sync = 0;
2088 void osc_oap_to_pending(struct osc_async_page *oap)
2090 struct loi_oap_pages *lop;
2092 if (oap->oap_cmd & OBD_BRW_WRITE)
2093 lop = &oap->oap_loi->loi_write_lop;
2095 lop = &oap->oap_loi->loi_read_lop;
2097 if (oap->oap_async_flags & ASYNC_HP)
2098 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2099 else if (oap->oap_async_flags & ASYNC_URGENT)
2100 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2101 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2102 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2105 /* this must be called holding the loi list lock to give coverage to exit_cache,
2106 * async_flag maintenance, and oap_request */
2107 static void osc_ap_completion(const struct lu_env *env,
2108 struct client_obd *cli, struct obdo *oa,
2109 struct osc_async_page *oap, int sent, int rc)
2114 if (oap->oap_request != NULL) {
2115 xid = ptlrpc_req_xid(oap->oap_request);
2116 ptlrpc_req_finished(oap->oap_request);
2117 oap->oap_request = NULL;
2120 cfs_spin_lock(&oap->oap_lock);
2121 oap->oap_async_flags = 0;
2122 cfs_spin_unlock(&oap->oap_lock);
2123 oap->oap_interrupted = 0;
2125 if (oap->oap_cmd & OBD_BRW_WRITE) {
2126 osc_process_ar(&cli->cl_ar, xid, rc);
2127 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2130 if (rc == 0 && oa != NULL) {
2131 if (oa->o_valid & OBD_MD_FLBLOCKS)
2132 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2133 if (oa->o_valid & OBD_MD_FLMTIME)
2134 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2135 if (oa->o_valid & OBD_MD_FLATIME)
2136 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2137 if (oa->o_valid & OBD_MD_FLCTIME)
2138 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2141 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2142 oap->oap_cmd, oa, rc);
2144 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2145 * I/O on the page could start, but OSC calls it under lock
2146 * and thus we can add oap back to pending safely */
2148 /* upper layer wants to leave the page on pending queue */
2149 osc_oap_to_pending(oap);
2151 osc_exit_cache(cli, oap, sent);
2155 static int brw_interpret(const struct lu_env *env,
2156 struct ptlrpc_request *req, void *data, int rc)
2158 struct osc_brw_async_args *aa = data;
2159 struct client_obd *cli;
2163 rc = osc_brw_fini_request(req, rc);
2164 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2165 if (osc_recoverable_error(rc)) {
2166 rc = osc_brw_redo_request(req, aa);
2172 capa_put(aa->aa_ocapa);
2173 aa->aa_ocapa = NULL;
2178 client_obd_list_lock(&cli->cl_loi_list_lock);
2180 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2181 * is called so we know whether to go to sync BRWs or wait for more
2182 * RPCs to complete */
2183 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2184 cli->cl_w_in_flight--;
2186 cli->cl_r_in_flight--;
2188 async = cfs_list_empty(&aa->aa_oaps);
2189 if (!async) { /* from osc_send_oap_rpc() */
2190 struct osc_async_page *oap, *tmp;
2191 /* the caller may re-use the oap after the completion call so
2192 * we need to clean it up a little */
2193 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2195 cfs_list_del_init(&oap->oap_rpc_item);
2196 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2198 OBDO_FREE(aa->aa_oa);
2199 } else { /* from async_internal() */
2201 for (i = 0; i < aa->aa_page_count; i++)
2202 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2204 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2205 OBDO_FREE(aa->aa_oa);
2207 osc_wake_cache_waiters(cli);
2208 osc_check_rpcs(env, cli);
2209 client_obd_list_unlock(&cli->cl_loi_list_lock);
2211 cl_req_completion(env, aa->aa_clerq, rc);
2212 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2216 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2217 struct client_obd *cli,
2218 cfs_list_t *rpc_list,
2219 int page_count, int cmd)
2221 struct ptlrpc_request *req;
2222 struct brw_page **pga = NULL;
2223 struct osc_brw_async_args *aa;
2224 struct obdo *oa = NULL;
2225 const struct obd_async_page_ops *ops = NULL;
2226 void *caller_data = NULL;
2227 struct osc_async_page *oap;
2228 struct osc_async_page *tmp;
2229 struct ost_body *body;
2230 struct cl_req *clerq = NULL;
2231 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2232 struct ldlm_lock *lock = NULL;
2233 struct cl_req_attr crattr;
2237 LASSERT(!cfs_list_empty(rpc_list));
2239 memset(&crattr, 0, sizeof crattr);
2240 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2242 GOTO(out, req = ERR_PTR(-ENOMEM));
2246 GOTO(out, req = ERR_PTR(-ENOMEM));
2249 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2250 struct cl_page *page = osc_oap2cl_page(oap);
2252 ops = oap->oap_caller_ops;
2253 caller_data = oap->oap_caller_data;
2255 clerq = cl_req_alloc(env, page, crt,
2256 1 /* only 1-object rpcs for
2259 GOTO(out, req = (void *)clerq);
2260 lock = oap->oap_ldlm_lock;
2262 pga[i] = &oap->oap_brw_page;
2263 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2264 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2265 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2267 cl_req_page_add(env, clerq, page);
2270 /* always get the data for the obdo for the rpc */
2271 LASSERT(ops != NULL);
2273 crattr.cra_capa = NULL;
2274 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2276 oa->o_handle = lock->l_remote_handle;
2277 oa->o_valid |= OBD_MD_FLHANDLE;
2280 rc = cl_req_prep(env, clerq);
2282 CERROR("cl_req_prep failed: %d\n", rc);
2283 GOTO(out, req = ERR_PTR(rc));
2286 sort_brw_pages(pga, page_count);
2287 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2288 pga, &req, crattr.cra_capa, 1);
2290 CERROR("prep_req failed: %d\n", rc);
2291 GOTO(out, req = ERR_PTR(rc));
2294 /* Need to update the timestamps after the request is built in case
2295 * we race with setattr (locally or in queue at OST). If OST gets
2296 * later setattr before earlier BRW (as determined by the request xid),
2297 * the OST will not use BRW timestamps. Sadly, there is no obvious
2298 * way to do this in a single call. bug 10150 */
2299 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2300 cl_req_attr_set(env, clerq, &crattr,
2301 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2303 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2304 aa = ptlrpc_req_async_args(req);
2305 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2306 cfs_list_splice(rpc_list, &aa->aa_oaps);
2307 CFS_INIT_LIST_HEAD(rpc_list);
2308 aa->aa_clerq = clerq;
2310 capa_put(crattr.cra_capa);
2315 OBD_FREE(pga, sizeof(*pga) * page_count);
2316 /* this should happen rarely and is pretty bad, it makes the
2317 * pending list not follow the dirty order */
2318 client_obd_list_lock(&cli->cl_loi_list_lock);
2319 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2320 cfs_list_del_init(&oap->oap_rpc_item);
2322 /* queued sync pages can be torn down while the pages
2323 * were between the pending list and the rpc */
2324 if (oap->oap_interrupted) {
2325 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2326 osc_ap_completion(env, cli, NULL, oap, 0,
2330 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2332 if (clerq && !IS_ERR(clerq))
2333 cl_req_completion(env, clerq, PTR_ERR(req));
2339 * prepare pages for ASYNC io and put pages in send queue.
2341 * \param cmd OBD_BRW_* macroses
2342 * \param lop pending pages
2344 * \return zero if pages successfully add to send queue.
2345 * \return not zere if error occurring.
2348 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2349 struct lov_oinfo *loi,
2350 int cmd, struct loi_oap_pages *lop)
2352 struct ptlrpc_request *req;
2353 obd_count page_count = 0;
2354 struct osc_async_page *oap = NULL, *tmp;
2355 struct osc_brw_async_args *aa;
2356 const struct obd_async_page_ops *ops;
2357 CFS_LIST_HEAD(rpc_list);
2358 CFS_LIST_HEAD(tmp_list);
2359 unsigned int ending_offset;
2360 unsigned starting_offset = 0;
2362 struct cl_object *clob = NULL;
2365 /* ASYNC_HP pages first. At present, when the lock the pages is
2366 * to be canceled, the pages covered by the lock will be sent out
2367 * with ASYNC_HP. We have to send out them as soon as possible. */
2368 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2369 if (oap->oap_async_flags & ASYNC_HP)
2370 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2372 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2373 if (++page_count >= cli->cl_max_pages_per_rpc)
2377 cfs_list_splice(&tmp_list, &lop->lop_pending);
2380 /* first we find the pages we're allowed to work with */
2381 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2383 ops = oap->oap_caller_ops;
2385 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2386 "magic 0x%x\n", oap, oap->oap_magic);
2389 /* pin object in memory, so that completion call-backs
2390 * can be safely called under client_obd_list lock. */
2391 clob = osc_oap2cl_page(oap)->cp_obj;
2392 cl_object_get(clob);
2395 if (page_count != 0 &&
2396 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2397 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2398 " oap %p, page %p, srvlock %u\n",
2399 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2403 /* If there is a gap at the start of this page, it can't merge
2404 * with any previous page, so we'll hand the network a
2405 * "fragmented" page array that it can't transfer in 1 RDMA */
2406 if (page_count != 0 && oap->oap_page_off != 0)
2409 /* in llite being 'ready' equates to the page being locked
2410 * until completion unlocks it. commit_write submits a page
2411 * as not ready because its unlock will happen unconditionally
2412 * as the call returns. if we race with commit_write giving
2413 * us that page we dont' want to create a hole in the page
2414 * stream, so we stop and leave the rpc to be fired by
2415 * another dirtier or kupdated interval (the not ready page
2416 * will still be on the dirty list). we could call in
2417 * at the end of ll_file_write to process the queue again. */
2418 if (!(oap->oap_async_flags & ASYNC_READY)) {
2419 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2422 CDEBUG(D_INODE, "oap %p page %p returned %d "
2423 "instead of ready\n", oap,
2427 /* llite is telling us that the page is still
2428 * in commit_write and that we should try
2429 * and put it in an rpc again later. we
2430 * break out of the loop so we don't create
2431 * a hole in the sequence of pages in the rpc
2436 /* the io isn't needed.. tell the checks
2437 * below to complete the rpc with EINTR */
2438 cfs_spin_lock(&oap->oap_lock);
2439 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2440 cfs_spin_unlock(&oap->oap_lock);
2441 oap->oap_count = -EINTR;
2444 cfs_spin_lock(&oap->oap_lock);
2445 oap->oap_async_flags |= ASYNC_READY;
2446 cfs_spin_unlock(&oap->oap_lock);
2449 LASSERTF(0, "oap %p page %p returned %d "
2450 "from make_ready\n", oap,
2458 * Page submitted for IO has to be locked. Either by
2459 * ->ap_make_ready() or by higher layers.
2461 #if defined(__KERNEL__) && defined(__linux__)
2463 struct cl_page *page;
2465 page = osc_oap2cl_page(oap);
2467 if (page->cp_type == CPT_CACHEABLE &&
2468 !(PageLocked(oap->oap_page) &&
2469 (CheckWriteback(oap->oap_page, cmd)))) {
2470 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2472 (long)oap->oap_page->flags,
2473 oap->oap_async_flags);
2479 /* take the page out of our book-keeping */
2480 cfs_list_del_init(&oap->oap_pending_item);
2481 lop_update_pending(cli, lop, cmd, -1);
2482 cfs_list_del_init(&oap->oap_urgent_item);
2484 if (page_count == 0)
2485 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2486 (PTLRPC_MAX_BRW_SIZE - 1);
2488 /* ask the caller for the size of the io as the rpc leaves. */
2489 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2491 ops->ap_refresh_count(env, oap->oap_caller_data,
2493 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2495 if (oap->oap_count <= 0) {
2496 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2498 osc_ap_completion(env, cli, NULL,
2499 oap, 0, oap->oap_count);
2503 /* now put the page back in our accounting */
2504 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2505 if (page_count == 0)
2506 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2507 if (++page_count >= cli->cl_max_pages_per_rpc)
2510 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2511 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2512 * have the same alignment as the initial writes that allocated
2513 * extents on the server. */
2514 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2515 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2516 if (ending_offset == 0)
2519 /* If there is a gap at the end of this page, it can't merge
2520 * with any subsequent pages, so we'll hand the network a
2521 * "fragmented" page array that it can't transfer in 1 RDMA */
2522 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2526 osc_wake_cache_waiters(cli);
2528 loi_list_maint(cli, loi);
2530 client_obd_list_unlock(&cli->cl_loi_list_lock);
2533 cl_object_put(env, clob);
2535 if (page_count == 0) {
2536 client_obd_list_lock(&cli->cl_loi_list_lock);
2540 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2542 LASSERT(cfs_list_empty(&rpc_list));
2543 loi_list_maint(cli, loi);
2544 RETURN(PTR_ERR(req));
2547 aa = ptlrpc_req_async_args(req);
2549 if (cmd == OBD_BRW_READ) {
2550 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2551 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2552 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2553 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2555 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2556 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2557 cli->cl_w_in_flight);
2558 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2559 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2561 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2563 client_obd_list_lock(&cli->cl_loi_list_lock);
2565 if (cmd == OBD_BRW_READ)
2566 cli->cl_r_in_flight++;
2568 cli->cl_w_in_flight++;
2570 /* queued sync pages can be torn down while the pages
2571 * were between the pending list and the rpc */
2573 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2574 /* only one oap gets a request reference */
2577 if (oap->oap_interrupted && !req->rq_intr) {
2578 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2580 ptlrpc_mark_interrupted(req);
2584 tmp->oap_request = ptlrpc_request_addref(req);
2586 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2587 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2589 req->rq_interpret_reply = brw_interpret;
2590 ptlrpcd_add_req(req, PSCOPE_BRW);
2594 #define LOI_DEBUG(LOI, STR, args...) \
2595 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2596 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2597 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2598 (LOI)->loi_write_lop.lop_num_pending, \
2599 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2600 (LOI)->loi_read_lop.lop_num_pending, \
2601 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2604 /* This is called by osc_check_rpcs() to find which objects have pages that
2605 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2606 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2610 /* First return objects that have blocked locks so that they
2611 * will be flushed quickly and other clients can get the lock,
2612 * then objects which have pages ready to be stuffed into RPCs */
2613 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2614 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2615 struct lov_oinfo, loi_hp_ready_item));
2616 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2617 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2618 struct lov_oinfo, loi_ready_item));
2620 /* then if we have cache waiters, return all objects with queued
2621 * writes. This is especially important when many small files
2622 * have filled up the cache and not been fired into rpcs because
2623 * they don't pass the nr_pending/object threshhold */
2624 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2625 !cfs_list_empty(&cli->cl_loi_write_list))
2626 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2627 struct lov_oinfo, loi_write_item));
2629 /* then return all queued objects when we have an invalid import
2630 * so that they get flushed */
2631 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2632 if (!cfs_list_empty(&cli->cl_loi_write_list))
2633 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2636 if (!cfs_list_empty(&cli->cl_loi_read_list))
2637 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2638 struct lov_oinfo, loi_read_item));
2643 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2645 struct osc_async_page *oap;
2648 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2649 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2650 struct osc_async_page, oap_urgent_item);
2651 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2654 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2655 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2656 struct osc_async_page, oap_urgent_item);
2657 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2660 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2663 /* called with the loi list lock held */
2664 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2666 struct lov_oinfo *loi;
2667 int rc = 0, race_counter = 0;
2670 while ((loi = osc_next_loi(cli)) != NULL) {
2671 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2673 if (osc_max_rpc_in_flight(cli, loi))
2676 /* attempt some read/write balancing by alternating between
2677 * reads and writes in an object. The makes_rpc checks here
2678 * would be redundant if we were getting read/write work items
2679 * instead of objects. we don't want send_oap_rpc to drain a
2680 * partial read pending queue when we're given this object to
2681 * do io on writes while there are cache waiters */
2682 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2683 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2684 &loi->loi_write_lop);
2686 CERROR("Write request failed with %d\n", rc);
2688 /* osc_send_oap_rpc failed, mostly because of
2691 * It can't break here, because if:
2692 * - a page was submitted by osc_io_submit, so
2694 * - no request in flight
2695 * - no subsequent request
2696 * The system will be in live-lock state,
2697 * because there is no chance to call
2698 * osc_io_unplug() and osc_check_rpcs() any
2699 * more. pdflush can't help in this case,
2700 * because it might be blocked at grabbing
2701 * the page lock as we mentioned.
2703 * Anyway, continue to drain pages. */
2712 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2713 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2714 &loi->loi_read_lop);
2716 CERROR("Read request failed with %d\n", rc);
2724 /* attempt some inter-object balancing by issueing rpcs
2725 * for each object in turn */
2726 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2727 cfs_list_del_init(&loi->loi_hp_ready_item);
2728 if (!cfs_list_empty(&loi->loi_ready_item))
2729 cfs_list_del_init(&loi->loi_ready_item);
2730 if (!cfs_list_empty(&loi->loi_write_item))
2731 cfs_list_del_init(&loi->loi_write_item);
2732 if (!cfs_list_empty(&loi->loi_read_item))
2733 cfs_list_del_init(&loi->loi_read_item);
2735 loi_list_maint(cli, loi);
2737 /* send_oap_rpc fails with 0 when make_ready tells it to
2738 * back off. llite's make_ready does this when it tries
2739 * to lock a page queued for write that is already locked.
2740 * we want to try sending rpcs from many objects, but we
2741 * don't want to spin failing with 0. */
2742 if (race_counter == 10)
2748 /* we're trying to queue a page in the osc so we're subject to the
2749 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2750 * If the osc's queued pages are already at that limit, then we want to sleep
2751 * until there is space in the osc's queue for us. We also may be waiting for
2752 * write credits from the OST if there are RPCs in flight that may return some
2753 * before we fall back to sync writes.
2755 * We need this know our allocation was granted in the presence of signals */
2756 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2760 client_obd_list_lock(&cli->cl_loi_list_lock);
2761 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2762 client_obd_list_unlock(&cli->cl_loi_list_lock);
2767 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2770 int osc_enter_cache_try(const struct lu_env *env,
2771 struct client_obd *cli, struct lov_oinfo *loi,
2772 struct osc_async_page *oap, int transient)
2776 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2778 osc_consume_write_grant(cli, &oap->oap_brw_page);
2780 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2781 cfs_atomic_inc(&obd_dirty_transit_pages);
2782 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2788 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2789 * grant or cache space. */
2790 static int osc_enter_cache(const struct lu_env *env,
2791 struct client_obd *cli, struct lov_oinfo *loi,
2792 struct osc_async_page *oap)
2794 struct osc_cache_waiter ocw;
2795 struct l_wait_info lwi = { 0 };
2799 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2800 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2801 cli->cl_dirty_max, obd_max_dirty_pages,
2802 cli->cl_lost_grant, cli->cl_avail_grant);
2804 /* force the caller to try sync io. this can jump the list
2805 * of queued writes and create a discontiguous rpc stream */
2806 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2807 loi->loi_ar.ar_force_sync)
2810 /* Hopefully normal case - cache space and write credits available */
2811 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2812 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2813 osc_enter_cache_try(env, cli, loi, oap, 0))
2816 /* Make sure that there are write rpcs in flight to wait for. This
2817 * is a little silly as this object may not have any pending but
2818 * other objects sure might. */
2819 if (cli->cl_w_in_flight) {
2820 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2821 cfs_waitq_init(&ocw.ocw_waitq);
2825 loi_list_maint(cli, loi);
2826 osc_check_rpcs(env, cli);
2827 client_obd_list_unlock(&cli->cl_loi_list_lock);
2829 CDEBUG(D_CACHE, "sleeping for cache space\n");
2830 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2832 client_obd_list_lock(&cli->cl_loi_list_lock);
2833 if (!cfs_list_empty(&ocw.ocw_entry)) {
2834 cfs_list_del(&ocw.ocw_entry);
2844 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2845 struct lov_oinfo *loi, cfs_page_t *page,
2846 obd_off offset, const struct obd_async_page_ops *ops,
2847 void *data, void **res, int nocache,
2848 struct lustre_handle *lockh)
2850 struct osc_async_page *oap;
2855 return cfs_size_round(sizeof(*oap));
2858 oap->oap_magic = OAP_MAGIC;
2859 oap->oap_cli = &exp->exp_obd->u.cli;
2862 oap->oap_caller_ops = ops;
2863 oap->oap_caller_data = data;
2865 oap->oap_page = page;
2866 oap->oap_obj_off = offset;
2867 if (!client_is_remote(exp) &&
2868 cfs_capable(CFS_CAP_SYS_RESOURCE))
2869 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2871 LASSERT(!(offset & ~CFS_PAGE_MASK));
2873 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2874 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2875 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2876 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2878 cfs_spin_lock_init(&oap->oap_lock);
2879 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2883 struct osc_async_page *oap_from_cookie(void *cookie)
2885 struct osc_async_page *oap = cookie;
2886 if (oap->oap_magic != OAP_MAGIC)
2887 return ERR_PTR(-EINVAL);
2891 int osc_queue_async_io(const struct lu_env *env,
2892 struct obd_export *exp, struct lov_stripe_md *lsm,
2893 struct lov_oinfo *loi, void *cookie,
2894 int cmd, obd_off off, int count,
2895 obd_flag brw_flags, enum async_flags async_flags)
2897 struct client_obd *cli = &exp->exp_obd->u.cli;
2898 struct osc_async_page *oap;
2902 oap = oap_from_cookie(cookie);
2904 RETURN(PTR_ERR(oap));
2906 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2909 if (!cfs_list_empty(&oap->oap_pending_item) ||
2910 !cfs_list_empty(&oap->oap_urgent_item) ||
2911 !cfs_list_empty(&oap->oap_rpc_item))
2914 /* check if the file's owner/group is over quota */
2915 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2916 struct cl_object *obj;
2917 struct cl_attr attr; /* XXX put attr into thread info */
2918 unsigned int qid[MAXQUOTAS];
2920 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2922 cl_object_attr_lock(obj);
2923 rc = cl_object_attr_get(env, obj, &attr);
2924 cl_object_attr_unlock(obj);
2926 qid[USRQUOTA] = attr.cat_uid;
2927 qid[GRPQUOTA] = attr.cat_gid;
2929 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2936 loi = lsm->lsm_oinfo[0];
2938 client_obd_list_lock(&cli->cl_loi_list_lock);
2940 LASSERT(off + count <= CFS_PAGE_SIZE);
2942 oap->oap_page_off = off;
2943 oap->oap_count = count;
2944 oap->oap_brw_flags = brw_flags;
2945 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2946 if (libcfs_memory_pressure_get())
2947 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2948 cfs_spin_lock(&oap->oap_lock);
2949 oap->oap_async_flags = async_flags;
2950 cfs_spin_unlock(&oap->oap_lock);
2952 if (cmd & OBD_BRW_WRITE) {
2953 rc = osc_enter_cache(env, cli, loi, oap);
2955 client_obd_list_unlock(&cli->cl_loi_list_lock);
2960 osc_oap_to_pending(oap);
2961 loi_list_maint(cli, loi);
2963 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2966 osc_check_rpcs(env, cli);
2967 client_obd_list_unlock(&cli->cl_loi_list_lock);
2972 /* aka (~was & now & flag), but this is more clear :) */
2973 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2975 int osc_set_async_flags_base(struct client_obd *cli,
2976 struct lov_oinfo *loi, struct osc_async_page *oap,
2977 obd_flag async_flags)
2979 struct loi_oap_pages *lop;
2983 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
2985 if (oap->oap_cmd & OBD_BRW_WRITE) {
2986 lop = &loi->loi_write_lop;
2988 lop = &loi->loi_read_lop;
2991 if ((oap->oap_async_flags & async_flags) == async_flags)
2994 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2995 flags |= ASYNC_READY;
2997 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2998 cfs_list_empty(&oap->oap_rpc_item)) {
2999 if (oap->oap_async_flags & ASYNC_HP)
3000 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3002 cfs_list_add_tail(&oap->oap_urgent_item,
3004 flags |= ASYNC_URGENT;
3005 loi_list_maint(cli, loi);
3007 cfs_spin_lock(&oap->oap_lock);
3008 oap->oap_async_flags |= flags;
3009 cfs_spin_unlock(&oap->oap_lock);
3011 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3012 oap->oap_async_flags);
3016 int osc_teardown_async_page(struct obd_export *exp,
3017 struct lov_stripe_md *lsm,
3018 struct lov_oinfo *loi, void *cookie)
3020 struct client_obd *cli = &exp->exp_obd->u.cli;
3021 struct loi_oap_pages *lop;
3022 struct osc_async_page *oap;
3026 oap = oap_from_cookie(cookie);
3028 RETURN(PTR_ERR(oap));
3031 loi = lsm->lsm_oinfo[0];
3033 if (oap->oap_cmd & OBD_BRW_WRITE) {
3034 lop = &loi->loi_write_lop;
3036 lop = &loi->loi_read_lop;
3039 client_obd_list_lock(&cli->cl_loi_list_lock);
3041 if (!cfs_list_empty(&oap->oap_rpc_item))
3042 GOTO(out, rc = -EBUSY);
3044 osc_exit_cache(cli, oap, 0);
3045 osc_wake_cache_waiters(cli);
3047 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3048 cfs_list_del_init(&oap->oap_urgent_item);
3049 cfs_spin_lock(&oap->oap_lock);
3050 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3051 cfs_spin_unlock(&oap->oap_lock);
3053 if (!cfs_list_empty(&oap->oap_pending_item)) {
3054 cfs_list_del_init(&oap->oap_pending_item);
3055 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3057 loi_list_maint(cli, loi);
3058 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3060 client_obd_list_unlock(&cli->cl_loi_list_lock);
3064 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3065 struct ldlm_enqueue_info *einfo,
3068 void *data = einfo->ei_cbdata;
3070 LASSERT(lock != NULL);
3071 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3072 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3073 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3074 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3076 lock_res_and_lock(lock);
3077 cfs_spin_lock(&osc_ast_guard);
3078 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3079 lock->l_ast_data = data;
3080 cfs_spin_unlock(&osc_ast_guard);
3081 unlock_res_and_lock(lock);
3084 static void osc_set_data_with_check(struct lustre_handle *lockh,
3085 struct ldlm_enqueue_info *einfo,
3088 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3091 osc_set_lock_data_with_check(lock, einfo, flags);
3092 LDLM_LOCK_PUT(lock);
3094 CERROR("lockh %p, data %p - client evicted?\n",
3095 lockh, einfo->ei_cbdata);
3098 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3099 ldlm_iterator_t replace, void *data)
3101 struct ldlm_res_id res_id;
3102 struct obd_device *obd = class_exp2obd(exp);
3104 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3105 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3109 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3110 obd_enqueue_update_f upcall, void *cookie,
3113 int intent = *flags & LDLM_FL_HAS_INTENT;
3117 /* The request was created before ldlm_cli_enqueue call. */
3118 if (rc == ELDLM_LOCK_ABORTED) {
3119 struct ldlm_reply *rep;
3120 rep = req_capsule_server_get(&req->rq_pill,
3123 LASSERT(rep != NULL);
3124 if (rep->lock_policy_res1)
3125 rc = rep->lock_policy_res1;
3129 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3130 *flags |= LDLM_FL_LVB_READY;
3131 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3132 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3135 /* Call the update callback. */
3136 rc = (*upcall)(cookie, rc);
3140 static int osc_enqueue_interpret(const struct lu_env *env,
3141 struct ptlrpc_request *req,
3142 struct osc_enqueue_args *aa, int rc)
3144 struct ldlm_lock *lock;
3145 struct lustre_handle handle;
3148 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3149 * might be freed anytime after lock upcall has been called. */
3150 lustre_handle_copy(&handle, aa->oa_lockh);
3151 mode = aa->oa_ei->ei_mode;
3153 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3155 lock = ldlm_handle2lock(&handle);
3157 /* Take an additional reference so that a blocking AST that
3158 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3159 * to arrive after an upcall has been executed by
3160 * osc_enqueue_fini(). */
3161 ldlm_lock_addref(&handle, mode);
3163 /* Complete obtaining the lock procedure. */
3164 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3165 mode, aa->oa_flags, aa->oa_lvb,
3166 sizeof(*aa->oa_lvb), &handle, rc);
3167 /* Complete osc stuff. */
3168 rc = osc_enqueue_fini(req, aa->oa_lvb,
3169 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3171 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3173 /* Release the lock for async request. */
3174 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3176 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3177 * not already released by
3178 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3180 ldlm_lock_decref(&handle, mode);
3182 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3183 aa->oa_lockh, req, aa);
3184 ldlm_lock_decref(&handle, mode);
3185 LDLM_LOCK_PUT(lock);
3189 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3190 struct lov_oinfo *loi, int flags,
3191 struct ost_lvb *lvb, __u32 mode, int rc)
3193 if (rc == ELDLM_OK) {
3194 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3197 LASSERT(lock != NULL);
3198 loi->loi_lvb = *lvb;
3199 tmp = loi->loi_lvb.lvb_size;
3200 /* Extend KMS up to the end of this lock and no further
3201 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3202 if (tmp > lock->l_policy_data.l_extent.end)
3203 tmp = lock->l_policy_data.l_extent.end + 1;
3204 if (tmp >= loi->loi_kms) {
3205 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3206 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3207 loi_kms_set(loi, tmp);
3209 LDLM_DEBUG(lock, "lock acquired, setting rss="
3210 LPU64"; leaving kms="LPU64", end="LPU64,
3211 loi->loi_lvb.lvb_size, loi->loi_kms,
3212 lock->l_policy_data.l_extent.end);
3214 ldlm_lock_allow_match(lock);
3215 LDLM_LOCK_PUT(lock);
3216 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3217 loi->loi_lvb = *lvb;
3218 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3219 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3223 EXPORT_SYMBOL(osc_update_enqueue);
3225 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3227 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3228 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3229 * other synchronous requests, however keeping some locks and trying to obtain
3230 * others may take a considerable amount of time in a case of ost failure; and
3231 * when other sync requests do not get released lock from a client, the client
3232 * is excluded from the cluster -- such scenarious make the life difficult, so
3233 * release locks just after they are obtained. */
3234 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3235 int *flags, ldlm_policy_data_t *policy,
3236 struct ost_lvb *lvb, int kms_valid,
3237 obd_enqueue_update_f upcall, void *cookie,
3238 struct ldlm_enqueue_info *einfo,
3239 struct lustre_handle *lockh,
3240 struct ptlrpc_request_set *rqset, int async)
3242 struct obd_device *obd = exp->exp_obd;
3243 struct ptlrpc_request *req = NULL;
3244 int intent = *flags & LDLM_FL_HAS_INTENT;
3249 /* Filesystem lock extents are extended to page boundaries so that
3250 * dealing with the page cache is a little smoother. */
3251 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3252 policy->l_extent.end |= ~CFS_PAGE_MASK;
3255 * kms is not valid when either object is completely fresh (so that no
3256 * locks are cached), or object was evicted. In the latter case cached
3257 * lock cannot be used, because it would prime inode state with
3258 * potentially stale LVB.
3263 /* Next, search for already existing extent locks that will cover us */
3264 /* If we're trying to read, we also search for an existing PW lock. The
3265 * VFS and page cache already protect us locally, so lots of readers/
3266 * writers can share a single PW lock.
3268 * There are problems with conversion deadlocks, so instead of
3269 * converting a read lock to a write lock, we'll just enqueue a new
3272 * At some point we should cancel the read lock instead of making them
3273 * send us a blocking callback, but there are problems with canceling
3274 * locks out from other users right now, too. */
3275 mode = einfo->ei_mode;
3276 if (einfo->ei_mode == LCK_PR)
3278 mode = ldlm_lock_match(obd->obd_namespace,
3279 *flags | LDLM_FL_LVB_READY, res_id,
3280 einfo->ei_type, policy, mode, lockh, 0);
3282 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3284 if (matched->l_ast_data == NULL ||
3285 matched->l_ast_data == einfo->ei_cbdata) {
3286 /* addref the lock only if not async requests and PW
3287 * lock is matched whereas we asked for PR. */
3288 if (!rqset && einfo->ei_mode != mode)
3289 ldlm_lock_addref(lockh, LCK_PR);
3290 osc_set_lock_data_with_check(matched, einfo, *flags);
3292 /* I would like to be able to ASSERT here that
3293 * rss <= kms, but I can't, for reasons which
3294 * are explained in lov_enqueue() */
3297 /* We already have a lock, and it's referenced */
3298 (*upcall)(cookie, ELDLM_OK);
3300 /* For async requests, decref the lock. */
3301 if (einfo->ei_mode != mode)
3302 ldlm_lock_decref(lockh, LCK_PW);
3304 ldlm_lock_decref(lockh, einfo->ei_mode);
3305 LDLM_LOCK_PUT(matched);
3308 ldlm_lock_decref(lockh, mode);
3309 LDLM_LOCK_PUT(matched);
3314 CFS_LIST_HEAD(cancels);
3315 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3316 &RQF_LDLM_ENQUEUE_LVB);
3320 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3324 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3326 ptlrpc_request_set_replen(req);
3329 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3330 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3332 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3333 sizeof(*lvb), lockh, async);
3336 struct osc_enqueue_args *aa;
3337 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3338 aa = ptlrpc_req_async_args(req);
3341 aa->oa_flags = flags;
3342 aa->oa_upcall = upcall;
3343 aa->oa_cookie = cookie;
3345 aa->oa_lockh = lockh;
3347 req->rq_interpret_reply =
3348 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3349 if (rqset == PTLRPCD_SET)
3350 ptlrpcd_add_req(req, PSCOPE_OTHER);
3352 ptlrpc_set_add_req(rqset, req);
3353 } else if (intent) {
3354 ptlrpc_req_finished(req);
3359 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3361 ptlrpc_req_finished(req);
3366 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3367 struct ldlm_enqueue_info *einfo,
3368 struct ptlrpc_request_set *rqset)
3370 struct ldlm_res_id res_id;
3374 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3375 oinfo->oi_md->lsm_object_gr, &res_id);
3377 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3378 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3379 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3380 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3381 rqset, rqset != NULL);
3385 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3386 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3387 int *flags, void *data, struct lustre_handle *lockh,
3390 struct obd_device *obd = exp->exp_obd;
3391 int lflags = *flags;
3395 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3398 /* Filesystem lock extents are extended to page boundaries so that
3399 * dealing with the page cache is a little smoother */
3400 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3401 policy->l_extent.end |= ~CFS_PAGE_MASK;
3403 /* Next, search for already existing extent locks that will cover us */
3404 /* If we're trying to read, we also search for an existing PW lock. The
3405 * VFS and page cache already protect us locally, so lots of readers/
3406 * writers can share a single PW lock. */
3410 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3411 res_id, type, policy, rc, lockh, unref);
3414 osc_set_data_with_check(lockh, data, lflags);
3415 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3416 ldlm_lock_addref(lockh, LCK_PR);
3417 ldlm_lock_decref(lockh, LCK_PW);
3424 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3428 if (unlikely(mode == LCK_GROUP))
3429 ldlm_lock_decref_and_cancel(lockh, mode);
3431 ldlm_lock_decref(lockh, mode);
3436 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3437 __u32 mode, struct lustre_handle *lockh)
3440 RETURN(osc_cancel_base(lockh, mode));
3443 static int osc_cancel_unused(struct obd_export *exp,
3444 struct lov_stripe_md *lsm, int flags,
3447 struct obd_device *obd = class_exp2obd(exp);
3448 struct ldlm_res_id res_id, *resp = NULL;
3451 resp = osc_build_res_name(lsm->lsm_object_id,
3452 lsm->lsm_object_gr, &res_id);
3455 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3458 static int osc_statfs_interpret(const struct lu_env *env,
3459 struct ptlrpc_request *req,
3460 struct osc_async_args *aa, int rc)
3462 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3463 struct obd_statfs *msfs;
3468 /* The request has in fact never been sent
3469 * due to issues at a higher level (LOV).
3470 * Exit immediately since the caller is
3471 * aware of the problem and takes care
3472 * of the clean up */
3475 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3476 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3482 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3484 GOTO(out, rc = -EPROTO);
3487 /* Reinitialize the RDONLY and DEGRADED flags at the client
3488 * on each statfs, so they don't stay set permanently. */
3489 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3491 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3492 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3493 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3494 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3496 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3497 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3498 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3499 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3501 /* Add a bit of hysteresis so this flag isn't continually flapping,
3502 * and ensure that new files don't get extremely fragmented due to
3503 * only a small amount of available space in the filesystem.
3504 * We want to set the NOSPC flag when there is less than ~0.1% free
3505 * and clear it when there is at least ~0.2% free space, so:
3506 * avail < ~0.1% max max = avail + used
3507 * 1025 * avail < avail + used used = blocks - free
3508 * 1024 * avail < used
3509 * 1024 * avail < blocks - free
3510 * avail < ((blocks - free) >> 10)
3512 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3513 * lose that amount of space so in those cases we report no space left
3514 * if their is less than 1 GB left. */
3515 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3516 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3517 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3518 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3519 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3520 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3521 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3523 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3525 *aa->aa_oi->oi_osfs = *msfs;
3527 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3531 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3532 __u64 max_age, struct ptlrpc_request_set *rqset)
3534 struct ptlrpc_request *req;
3535 struct osc_async_args *aa;
3539 /* We could possibly pass max_age in the request (as an absolute
3540 * timestamp or a "seconds.usec ago") so the target can avoid doing
3541 * extra calls into the filesystem if that isn't necessary (e.g.
3542 * during mount that would help a bit). Having relative timestamps
3543 * is not so great if request processing is slow, while absolute
3544 * timestamps are not ideal because they need time synchronization. */
3545 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3549 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3551 ptlrpc_request_free(req);
3554 ptlrpc_request_set_replen(req);
3555 req->rq_request_portal = OST_CREATE_PORTAL;
3556 ptlrpc_at_set_req_timeout(req);
3558 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3559 /* procfs requests not want stat in wait for avoid deadlock */
3560 req->rq_no_resend = 1;
3561 req->rq_no_delay = 1;
3564 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3565 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3566 aa = ptlrpc_req_async_args(req);
3569 ptlrpc_set_add_req(rqset, req);
3573 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3574 __u64 max_age, __u32 flags)
3576 struct obd_statfs *msfs;
3577 struct ptlrpc_request *req;
3578 struct obd_import *imp = NULL;
3582 /*Since the request might also come from lprocfs, so we need
3583 *sync this with client_disconnect_export Bug15684*/
3584 cfs_down_read(&obd->u.cli.cl_sem);
3585 if (obd->u.cli.cl_import)
3586 imp = class_import_get(obd->u.cli.cl_import);
3587 cfs_up_read(&obd->u.cli.cl_sem);
3591 /* We could possibly pass max_age in the request (as an absolute
3592 * timestamp or a "seconds.usec ago") so the target can avoid doing
3593 * extra calls into the filesystem if that isn't necessary (e.g.
3594 * during mount that would help a bit). Having relative timestamps
3595 * is not so great if request processing is slow, while absolute
3596 * timestamps are not ideal because they need time synchronization. */
3597 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3599 class_import_put(imp);
3604 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3606 ptlrpc_request_free(req);
3609 ptlrpc_request_set_replen(req);
3610 req->rq_request_portal = OST_CREATE_PORTAL;
3611 ptlrpc_at_set_req_timeout(req);
3613 if (flags & OBD_STATFS_NODELAY) {
3614 /* procfs requests not want stat in wait for avoid deadlock */
3615 req->rq_no_resend = 1;
3616 req->rq_no_delay = 1;
3619 rc = ptlrpc_queue_wait(req);
3623 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3625 GOTO(out, rc = -EPROTO);
3632 ptlrpc_req_finished(req);
3636 /* Retrieve object striping information.
3638 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3639 * the maximum number of OST indices which will fit in the user buffer.
3640 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3642 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3644 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3645 struct lov_user_md_v3 lum, *lumk;
3646 struct lov_user_ost_data_v1 *lmm_objects;
3647 int rc = 0, lum_size;
3653 /* we only need the header part from user space to get lmm_magic and
3654 * lmm_stripe_count, (the header part is common to v1 and v3) */
3655 lum_size = sizeof(struct lov_user_md_v1);
3656 if (cfs_copy_from_user(&lum, lump, lum_size))
3659 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3660 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3663 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3664 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3665 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3666 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3668 /* we can use lov_mds_md_size() to compute lum_size
3669 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3670 if (lum.lmm_stripe_count > 0) {
3671 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3672 OBD_ALLOC(lumk, lum_size);
3676 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3677 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3679 lmm_objects = &(lumk->lmm_objects[0]);
3680 lmm_objects->l_object_id = lsm->lsm_object_id;
3682 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3686 lumk->lmm_object_id = lsm->lsm_object_id;
3687 lumk->lmm_object_gr = lsm->lsm_object_gr;
3688 lumk->lmm_stripe_count = 1;
3690 if (cfs_copy_to_user(lump, lumk, lum_size))
3694 OBD_FREE(lumk, lum_size);
3700 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3701 void *karg, void *uarg)
3703 struct obd_device *obd = exp->exp_obd;
3704 struct obd_ioctl_data *data = karg;
3708 if (!cfs_try_module_get(THIS_MODULE)) {
3709 CERROR("Can't get module. Is it alive?");
3713 case OBD_IOC_LOV_GET_CONFIG: {
3715 struct lov_desc *desc;
3716 struct obd_uuid uuid;
3720 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3721 GOTO(out, err = -EINVAL);
3723 data = (struct obd_ioctl_data *)buf;
3725 if (sizeof(*desc) > data->ioc_inllen1) {
3726 obd_ioctl_freedata(buf, len);
3727 GOTO(out, err = -EINVAL);
3730 if (data->ioc_inllen2 < sizeof(uuid)) {
3731 obd_ioctl_freedata(buf, len);
3732 GOTO(out, err = -EINVAL);
3735 desc = (struct lov_desc *)data->ioc_inlbuf1;
3736 desc->ld_tgt_count = 1;
3737 desc->ld_active_tgt_count = 1;
3738 desc->ld_default_stripe_count = 1;
3739 desc->ld_default_stripe_size = 0;
3740 desc->ld_default_stripe_offset = 0;
3741 desc->ld_pattern = 0;
3742 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3744 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3746 err = cfs_copy_to_user((void *)uarg, buf, len);
3749 obd_ioctl_freedata(buf, len);
3752 case LL_IOC_LOV_SETSTRIPE:
3753 err = obd_alloc_memmd(exp, karg);
3757 case LL_IOC_LOV_GETSTRIPE:
3758 err = osc_getstripe(karg, uarg);
3760 case OBD_IOC_CLIENT_RECOVER:
3761 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3766 case IOC_OSC_SET_ACTIVE:
3767 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3770 case OBD_IOC_POLL_QUOTACHECK:
3771 err = lquota_poll_check(quota_interface, exp,
3772 (struct if_quotacheck *)karg);
3774 case OBD_IOC_PING_TARGET:
3775 err = ptlrpc_obd_ping(obd);
3778 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3779 cmd, cfs_curproc_comm());
3780 GOTO(out, err = -ENOTTY);
3783 cfs_module_put(THIS_MODULE);
3787 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3788 void *key, __u32 *vallen, void *val,
3789 struct lov_stripe_md *lsm)
3792 if (!vallen || !val)
3795 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3796 __u32 *stripe = val;
3797 *vallen = sizeof(*stripe);
3800 } else if (KEY_IS(KEY_LAST_ID)) {
3801 struct ptlrpc_request *req;
3806 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3807 &RQF_OST_GET_INFO_LAST_ID);
3811 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3812 RCL_CLIENT, keylen);
3813 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3815 ptlrpc_request_free(req);
3819 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3820 memcpy(tmp, key, keylen);
3822 req->rq_no_delay = req->rq_no_resend = 1;
3823 ptlrpc_request_set_replen(req);
3824 rc = ptlrpc_queue_wait(req);
3828 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3830 GOTO(out, rc = -EPROTO);
3832 *((obd_id *)val) = *reply;
3834 ptlrpc_req_finished(req);
3836 } else if (KEY_IS(KEY_FIEMAP)) {
3837 struct ptlrpc_request *req;
3838 struct ll_user_fiemap *reply;
3842 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3843 &RQF_OST_GET_INFO_FIEMAP);
3847 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3848 RCL_CLIENT, keylen);
3849 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3850 RCL_CLIENT, *vallen);
3851 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3852 RCL_SERVER, *vallen);
3854 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3856 ptlrpc_request_free(req);
3860 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3861 memcpy(tmp, key, keylen);
3862 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3863 memcpy(tmp, val, *vallen);
3865 ptlrpc_request_set_replen(req);
3866 rc = ptlrpc_queue_wait(req);
3870 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3872 GOTO(out1, rc = -EPROTO);
3874 memcpy(val, reply, *vallen);
3876 ptlrpc_req_finished(req);
3884 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3886 struct llog_ctxt *ctxt;
3890 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3892 rc = llog_initiator_connect(ctxt);
3893 llog_ctxt_put(ctxt);
3895 /* XXX return an error? skip setting below flags? */
3898 cfs_spin_lock(&imp->imp_lock);
3899 imp->imp_server_timeout = 1;
3900 imp->imp_pingable = 1;
3901 cfs_spin_unlock(&imp->imp_lock);
3902 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3907 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3908 struct ptlrpc_request *req,
3915 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3918 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3919 void *key, obd_count vallen, void *val,
3920 struct ptlrpc_request_set *set)
3922 struct ptlrpc_request *req;
3923 struct obd_device *obd = exp->exp_obd;
3924 struct obd_import *imp = class_exp2cliimp(exp);
3929 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3931 if (KEY_IS(KEY_NEXT_ID)) {
3933 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3935 if (vallen != sizeof(obd_id))
3940 if (vallen != sizeof(obd_id))
3943 /* avoid race between allocate new object and set next id
3944 * from ll_sync thread */
3945 cfs_spin_lock(&oscc->oscc_lock);
3946 new_val = *((obd_id*)val) + 1;
3947 if (new_val > oscc->oscc_next_id)
3948 oscc->oscc_next_id = new_val;
3949 cfs_spin_unlock(&oscc->oscc_lock);
3950 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3951 exp->exp_obd->obd_name,
3952 obd->u.cli.cl_oscc.oscc_next_id);
3957 if (KEY_IS(KEY_INIT_RECOV)) {
3958 if (vallen != sizeof(int))
3960 cfs_spin_lock(&imp->imp_lock);
3961 imp->imp_initial_recov = *(int *)val;
3962 cfs_spin_unlock(&imp->imp_lock);
3963 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3964 exp->exp_obd->obd_name,
3965 imp->imp_initial_recov);
3969 if (KEY_IS(KEY_CHECKSUM)) {
3970 if (vallen != sizeof(int))
3972 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3976 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3977 sptlrpc_conf_client_adapt(obd);
3981 if (KEY_IS(KEY_FLUSH_CTX)) {
3982 sptlrpc_import_flush_my_ctx(imp);
3986 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3989 /* We pass all other commands directly to OST. Since nobody calls osc
3990 methods directly and everybody is supposed to go through LOV, we
3991 assume lov checked invalid values for us.
3992 The only recognised values so far are evict_by_nid and mds_conn.
3993 Even if something bad goes through, we'd get a -EINVAL from OST
3996 if (KEY_IS(KEY_GRANT_SHRINK))
3997 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3999 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4004 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4005 RCL_CLIENT, keylen);
4006 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4007 RCL_CLIENT, vallen);
4008 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4010 ptlrpc_request_free(req);
4014 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4015 memcpy(tmp, key, keylen);
4016 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4017 memcpy(tmp, val, vallen);
4019 if (KEY_IS(KEY_MDS_CONN)) {
4020 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4022 oscc->oscc_oa.o_gr = (*(__u32 *)val);
4023 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4024 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
4025 req->rq_no_delay = req->rq_no_resend = 1;
4026 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4027 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4028 struct osc_grant_args *aa;
4031 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4032 aa = ptlrpc_req_async_args(req);
4035 ptlrpc_req_finished(req);
4038 *oa = ((struct ost_body *)val)->oa;
4040 req->rq_interpret_reply = osc_shrink_grant_interpret;
4043 ptlrpc_request_set_replen(req);
4044 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4045 LASSERT(set != NULL);
4046 ptlrpc_set_add_req(set, req);
4047 ptlrpc_check_set(NULL, set);
4049 ptlrpcd_add_req(req, PSCOPE_OTHER);
4055 static struct llog_operations osc_size_repl_logops = {
4056 lop_cancel: llog_obd_repl_cancel
4059 static struct llog_operations osc_mds_ost_orig_logops;
4061 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4062 struct obd_device *tgt, struct llog_catid *catid)
4067 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4068 &catid->lci_logid, &osc_mds_ost_orig_logops);
4070 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4074 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4075 NULL, &osc_size_repl_logops);
4077 struct llog_ctxt *ctxt =
4078 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4081 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4086 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4087 obd->obd_name, tgt->obd_name, catid, rc);
4088 CERROR("logid "LPX64":0x%x\n",
4089 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4094 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4095 struct obd_device *disk_obd, int *index)
4097 struct llog_catid catid;
4098 static char name[32] = CATLIST;
4102 LASSERT(olg == &obd->obd_olg);
4104 cfs_mutex_down(&olg->olg_cat_processing);
4105 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4107 CERROR("rc: %d\n", rc);
4111 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4112 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4113 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4115 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4117 CERROR("rc: %d\n", rc);
4121 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4123 CERROR("rc: %d\n", rc);
4128 cfs_mutex_up(&olg->olg_cat_processing);
4133 static int osc_llog_finish(struct obd_device *obd, int count)
4135 struct llog_ctxt *ctxt;
4136 int rc = 0, rc2 = 0;
4139 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4141 rc = llog_cleanup(ctxt);
4143 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4145 rc2 = llog_cleanup(ctxt);
4152 static int osc_reconnect(const struct lu_env *env,
4153 struct obd_export *exp, struct obd_device *obd,
4154 struct obd_uuid *cluuid,
4155 struct obd_connect_data *data,
4158 struct client_obd *cli = &obd->u.cli;
4160 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4163 client_obd_list_lock(&cli->cl_loi_list_lock);
4164 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4165 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4166 lost_grant = cli->cl_lost_grant;
4167 cli->cl_lost_grant = 0;
4168 client_obd_list_unlock(&cli->cl_loi_list_lock);
4170 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4171 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4172 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4173 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4174 " ocd_grant: %d\n", data->ocd_connect_flags,
4175 data->ocd_version, data->ocd_grant);
4181 static int osc_disconnect(struct obd_export *exp)
4183 struct obd_device *obd = class_exp2obd(exp);
4184 struct llog_ctxt *ctxt;
4187 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4189 if (obd->u.cli.cl_conn_count == 1) {
4190 /* Flush any remaining cancel messages out to the
4192 llog_sync(ctxt, exp);
4194 llog_ctxt_put(ctxt);
4196 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4200 rc = client_disconnect_export(exp);
4202 * Initially we put del_shrink_grant before disconnect_export, but it
4203 * causes the following problem if setup (connect) and cleanup
4204 * (disconnect) are tangled together.
4205 * connect p1 disconnect p2
4206 * ptlrpc_connect_import
4207 * ............... class_manual_cleanup
4210 * ptlrpc_connect_interrupt
4212 * add this client to shrink list
4214 * Bang! pinger trigger the shrink.
4215 * So the osc should be disconnected from the shrink list, after we
4216 * are sure the import has been destroyed. BUG18662
4218 if (obd->u.cli.cl_import == NULL)
4219 osc_del_shrink_grant(&obd->u.cli);
4223 static int osc_import_event(struct obd_device *obd,
4224 struct obd_import *imp,
4225 enum obd_import_event event)
4227 struct client_obd *cli;
4231 LASSERT(imp->imp_obd == obd);
4234 case IMP_EVENT_DISCON: {
4235 /* Only do this on the MDS OSC's */
4236 if (imp->imp_server_timeout) {
4237 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4239 cfs_spin_lock(&oscc->oscc_lock);
4240 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4241 cfs_spin_unlock(&oscc->oscc_lock);
4244 client_obd_list_lock(&cli->cl_loi_list_lock);
4245 cli->cl_avail_grant = 0;
4246 cli->cl_lost_grant = 0;
4247 client_obd_list_unlock(&cli->cl_loi_list_lock);
4250 case IMP_EVENT_INACTIVE: {
4251 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4254 case IMP_EVENT_INVALIDATE: {
4255 struct ldlm_namespace *ns = obd->obd_namespace;
4259 env = cl_env_get(&refcheck);
4263 client_obd_list_lock(&cli->cl_loi_list_lock);
4264 /* all pages go to failing rpcs due to the invalid
4266 osc_check_rpcs(env, cli);
4267 client_obd_list_unlock(&cli->cl_loi_list_lock);
4269 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4270 cl_env_put(env, &refcheck);
4275 case IMP_EVENT_ACTIVE: {
4276 /* Only do this on the MDS OSC's */
4277 if (imp->imp_server_timeout) {
4278 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4280 cfs_spin_lock(&oscc->oscc_lock);
4281 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4282 cfs_spin_unlock(&oscc->oscc_lock);
4284 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4287 case IMP_EVENT_OCD: {
4288 struct obd_connect_data *ocd = &imp->imp_connect_data;
4290 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4291 osc_init_grant(&obd->u.cli, ocd);
4294 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4295 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4297 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4301 CERROR("Unknown import event %d\n", event);
4307 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4313 rc = ptlrpcd_addref();
4317 rc = client_obd_setup(obd, lcfg);
4321 struct lprocfs_static_vars lvars = { 0 };
4322 struct client_obd *cli = &obd->u.cli;
4324 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4325 lprocfs_osc_init_vars(&lvars);
4326 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4327 lproc_osc_attach_seqstat(obd);
4328 sptlrpc_lprocfs_cliobd_attach(obd);
4329 ptlrpc_lprocfs_register_obd(obd);
4333 /* We need to allocate a few requests more, because
4334 brw_interpret tries to create new requests before freeing
4335 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4336 reserved, but I afraid that might be too much wasted RAM
4337 in fact, so 2 is just my guess and still should work. */
4338 cli->cl_import->imp_rq_pool =
4339 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4341 ptlrpc_add_rqs_to_pool);
4343 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4344 cfs_sema_init(&cli->cl_grant_sem, 1);
4350 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4356 case OBD_CLEANUP_EARLY: {
4357 struct obd_import *imp;
4358 imp = obd->u.cli.cl_import;
4359 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4360 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4361 ptlrpc_deactivate_import(imp);
4362 cfs_spin_lock(&imp->imp_lock);
4363 imp->imp_pingable = 0;
4364 cfs_spin_unlock(&imp->imp_lock);
4367 case OBD_CLEANUP_EXPORTS: {
4368 /* If we set up but never connected, the
4369 client import will not have been cleaned. */
4370 if (obd->u.cli.cl_import) {
4371 struct obd_import *imp;
4372 cfs_down_write(&obd->u.cli.cl_sem);
4373 imp = obd->u.cli.cl_import;
4374 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4376 ptlrpc_invalidate_import(imp);
4377 if (imp->imp_rq_pool) {
4378 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4379 imp->imp_rq_pool = NULL;
4381 class_destroy_import(imp);
4382 cfs_up_write(&obd->u.cli.cl_sem);
4383 obd->u.cli.cl_import = NULL;
4385 rc = obd_llog_finish(obd, 0);
4387 CERROR("failed to cleanup llogging subsystems\n");
4394 int osc_cleanup(struct obd_device *obd)
4399 ptlrpc_lprocfs_unregister_obd(obd);
4400 lprocfs_obd_cleanup(obd);
4402 /* free memory of osc quota cache */
4403 lquota_cleanup(quota_interface, obd);
4405 rc = client_obd_cleanup(obd);
4411 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4413 struct lprocfs_static_vars lvars = { 0 };
4416 lprocfs_osc_init_vars(&lvars);
4418 switch (lcfg->lcfg_command) {
4420 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4430 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4432 return osc_process_config_base(obd, buf);
4435 struct obd_ops osc_obd_ops = {
4436 .o_owner = THIS_MODULE,
4437 .o_setup = osc_setup,
4438 .o_precleanup = osc_precleanup,
4439 .o_cleanup = osc_cleanup,
4440 .o_add_conn = client_import_add_conn,
4441 .o_del_conn = client_import_del_conn,
4442 .o_connect = client_connect_import,
4443 .o_reconnect = osc_reconnect,
4444 .o_disconnect = osc_disconnect,
4445 .o_statfs = osc_statfs,
4446 .o_statfs_async = osc_statfs_async,
4447 .o_packmd = osc_packmd,
4448 .o_unpackmd = osc_unpackmd,
4449 .o_precreate = osc_precreate,
4450 .o_create = osc_create,
4451 .o_create_async = osc_create_async,
4452 .o_destroy = osc_destroy,
4453 .o_getattr = osc_getattr,
4454 .o_getattr_async = osc_getattr_async,
4455 .o_setattr = osc_setattr,
4456 .o_setattr_async = osc_setattr_async,
4458 .o_punch = osc_punch,
4460 .o_enqueue = osc_enqueue,
4461 .o_change_cbdata = osc_change_cbdata,
4462 .o_cancel = osc_cancel,
4463 .o_cancel_unused = osc_cancel_unused,
4464 .o_iocontrol = osc_iocontrol,
4465 .o_get_info = osc_get_info,
4466 .o_set_info_async = osc_set_info_async,
4467 .o_import_event = osc_import_event,
4468 .o_llog_init = osc_llog_init,
4469 .o_llog_finish = osc_llog_finish,
4470 .o_process_config = osc_process_config,
4473 extern struct lu_kmem_descr osc_caches[];
4474 extern cfs_spinlock_t osc_ast_guard;
4475 extern cfs_lock_class_key_t osc_ast_guard_class;
4477 int __init osc_init(void)
4479 struct lprocfs_static_vars lvars = { 0 };
4483 /* print an address of _any_ initialized kernel symbol from this
4484 * module, to allow debugging with gdb that doesn't support data
4485 * symbols from modules.*/
4486 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4488 rc = lu_kmem_init(osc_caches);
4490 lprocfs_osc_init_vars(&lvars);
4492 cfs_request_module("lquota");
4493 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4494 lquota_init(quota_interface);
4495 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4497 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4498 LUSTRE_OSC_NAME, &osc_device_type);
4500 if (quota_interface)
4501 PORTAL_SYMBOL_PUT(osc_quota_interface);
4502 lu_kmem_fini(osc_caches);
4506 cfs_spin_lock_init(&osc_ast_guard);
4507 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4509 osc_mds_ost_orig_logops = llog_lvfs_ops;
4510 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4511 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4512 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4513 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4519 static void /*__exit*/ osc_exit(void)
4521 lu_device_type_fini(&osc_device_type);
4523 lquota_exit(quota_interface);
4524 if (quota_interface)
4525 PORTAL_SYMBOL_PUT(osc_quota_interface);
4527 class_unregister_type(LUSTRE_OSC_NAME);
4528 lu_kmem_fini(osc_caches);
4531 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4532 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4533 MODULE_LICENSE("GPL");
4535 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);