1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_async_args *aa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
363 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 struct ptlrpc_request_set *rqset)
371 struct ptlrpc_request *req;
372 struct osc_async_args *aa;
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
380 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383 ptlrpc_request_free(req);
387 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390 osc_pack_req_body(req, oinfo);
392 ptlrpc_request_set_replen(req);
394 /* do mds to ost setattr asynchronously */
396 /* Do not wait for response. */
397 ptlrpcd_add_req(req, PSCOPE_OTHER);
399 req->rq_interpret_reply =
400 (ptlrpc_interpterer_t)osc_setattr_interpret;
402 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
403 aa = ptlrpc_req_async_args(req);
406 ptlrpc_set_add_req(rqset, req);
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413 struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 struct ptlrpc_request *req;
416 struct ost_body *body;
417 struct lov_stripe_md *lsm;
426 rc = obd_alloc_memmd(exp, &lsm);
431 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433 GOTO(out, rc = -ENOMEM);
435 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437 ptlrpc_request_free(req);
441 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443 lustre_set_wire_obdo(&body->oa, oa);
445 ptlrpc_request_set_replen(req);
447 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448 oa->o_flags == OBD_FL_DELORPHAN) {
450 "delorphan from OST integration");
451 /* Don't resend the delorphan req */
452 req->rq_no_resend = req->rq_no_delay = 1;
455 rc = ptlrpc_queue_wait(req);
459 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461 GOTO(out_req, rc = -EPROTO);
463 lustre_get_wire_obdo(oa, &body->oa);
465 /* This should really be sent by the OST */
466 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
467 oa->o_valid |= OBD_MD_FLBLKSZ;
469 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470 * have valid lsm_oinfo data structs, so don't go touching that.
471 * This needs to be fixed in a big way.
473 lsm->lsm_object_id = oa->o_id;
474 lsm->lsm_object_gr = oa->o_gr;
478 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
480 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481 if (!oti->oti_logcookies)
482 oti_alloc_cookies(oti, 1);
483 *oti->oti_logcookies = oa->o_lcookie;
487 CDEBUG(D_HA, "transno: "LPD64"\n",
488 lustre_msg_get_transno(req->rq_repmsg));
490 ptlrpc_req_finished(req);
493 obd_free_memmd(exp, &lsm);
497 static int osc_punch_interpret(const struct lu_env *env,
498 struct ptlrpc_request *req,
499 struct osc_punch_args *aa, int rc)
501 struct ost_body *body;
507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509 GOTO(out, rc = -EPROTO);
511 lustre_get_wire_obdo(aa->pa_oa, &body->oa);
513 rc = aa->pa_upcall(aa->pa_cookie, rc);
517 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
518 struct obd_capa *capa,
519 obd_enqueue_update_f upcall, void *cookie,
520 struct ptlrpc_request_set *rqset)
522 struct ptlrpc_request *req;
523 struct osc_punch_args *aa;
524 struct ost_body *body;
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 osc_set_capa_size(req, &RMF_CAPA1, capa);
533 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535 ptlrpc_request_free(req);
538 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req);
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 lustre_set_wire_obdo(&body->oa, oa);
544 osc_pack_capa(req, body, capa);
546 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
550 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551 aa = ptlrpc_req_async_args(req);
553 aa->pa_upcall = upcall;
554 aa->pa_cookie = cookie;
555 if (rqset == PTLRPCD_SET)
556 ptlrpcd_add_req(req, PSCOPE_OTHER);
558 ptlrpc_set_add_req(rqset, req);
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564 struct obd_trans_info *oti,
565 struct ptlrpc_request_set *rqset)
567 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
568 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
571 oinfo->oi_cb_up, oinfo, rqset);
574 static int osc_sync(struct obd_export *exp, struct obdo *oa,
575 struct lov_stripe_md *md, obd_size start, obd_size end,
578 struct ptlrpc_request *req;
579 struct ost_body *body;
584 CDEBUG(D_INFO, "oa NULL\n");
588 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
592 osc_set_capa_size(req, &RMF_CAPA1, capa);
593 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
595 ptlrpc_request_free(req);
599 /* overload the size and blocks fields in the oa with start/end */
600 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
602 lustre_set_wire_obdo(&body->oa, oa);
603 body->oa.o_size = start;
604 body->oa.o_blocks = end;
605 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
606 osc_pack_capa(req, body, capa);
608 ptlrpc_request_set_replen(req);
610 rc = ptlrpc_queue_wait(req);
614 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
616 GOTO(out, rc = -EPROTO);
618 lustre_get_wire_obdo(oa, &body->oa);
622 ptlrpc_req_finished(req);
626 /* Find and cancel locally locks matched by @mode in the resource found by
627 * @objid. Found locks are added into @cancel list. Returns the amount of
628 * locks added to @cancels list. */
629 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
631 ldlm_mode_t mode, int lock_flags)
633 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
634 struct ldlm_res_id res_id;
635 struct ldlm_resource *res;
639 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
640 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
644 LDLM_RESOURCE_ADDREF(res);
645 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
646 lock_flags, 0, NULL);
647 LDLM_RESOURCE_DELREF(res);
648 ldlm_resource_putref(res);
652 static int osc_destroy_interpret(const struct lu_env *env,
653 struct ptlrpc_request *req, void *data,
656 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
658 cfs_atomic_dec(&cli->cl_destroy_in_flight);
659 cfs_waitq_signal(&cli->cl_destroy_waitq);
663 static int osc_can_send_destroy(struct client_obd *cli)
665 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
666 cli->cl_max_rpcs_in_flight) {
667 /* The destroy request can be sent */
670 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
671 cli->cl_max_rpcs_in_flight) {
673 * The counter has been modified between the two atomic
676 cfs_waitq_signal(&cli->cl_destroy_waitq);
681 /* Destroy requests can be async always on the client, and we don't even really
682 * care about the return code since the client cannot do anything at all about
684 * When the MDS is unlinking a filename, it saves the file objects into a
685 * recovery llog, and these object records are cancelled when the OST reports
686 * they were destroyed and sync'd to disk (i.e. transaction committed).
687 * If the client dies, or the OST is down when the object should be destroyed,
688 * the records are not cancelled, and when the OST reconnects to the MDS next,
689 * it will retrieve the llog unlink logs and then sends the log cancellation
690 * cookies to the MDS after committing destroy transactions. */
691 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
692 struct lov_stripe_md *ea, struct obd_trans_info *oti,
693 struct obd_export *md_export, void *capa)
695 struct client_obd *cli = &exp->exp_obd->u.cli;
696 struct ptlrpc_request *req;
697 struct ost_body *body;
698 CFS_LIST_HEAD(cancels);
703 CDEBUG(D_INFO, "oa NULL\n");
707 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
708 LDLM_FL_DISCARD_DATA);
710 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
712 ldlm_lock_list_put(&cancels, l_bl_ast, count);
716 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
717 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
720 ptlrpc_request_free(req);
724 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
725 ptlrpc_at_set_req_timeout(req);
727 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
728 oa->o_lcookie = *oti->oti_logcookies;
729 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
731 lustre_set_wire_obdo(&body->oa, oa);
733 osc_pack_capa(req, body, (struct obd_capa *)capa);
734 ptlrpc_request_set_replen(req);
736 /* don't throttle destroy RPCs for the MDT */
737 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
738 req->rq_interpret_reply = osc_destroy_interpret;
739 if (!osc_can_send_destroy(cli)) {
740 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
744 * Wait until the number of on-going destroy RPCs drops
745 * under max_rpc_in_flight
747 l_wait_event_exclusive(cli->cl_destroy_waitq,
748 osc_can_send_destroy(cli), &lwi);
752 /* Do not wait for response */
753 ptlrpcd_add_req(req, PSCOPE_OTHER);
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
760 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
762 LASSERT(!(oa->o_valid & bits));
765 client_obd_list_lock(&cli->cl_loi_list_lock);
766 oa->o_dirty = cli->cl_dirty;
767 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
771 } else if (cfs_atomic_read(&obd_dirty_pages) -
772 cfs_atomic_read(&obd_dirty_transit_pages) >
773 obd_max_dirty_pages + 1){
774 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
775 * not covered by a lock thus they may safely race and trip
776 * this CERROR() unless we add in a small fudge factor (+1). */
777 CERROR("dirty %d - %d > system dirty_max %d\n",
778 cfs_atomic_read(&obd_dirty_pages),
779 cfs_atomic_read(&obd_dirty_transit_pages),
780 obd_max_dirty_pages);
782 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
783 CERROR("dirty %lu - dirty_max %lu too big???\n",
784 cli->cl_dirty, cli->cl_dirty_max);
787 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
788 (cli->cl_max_rpcs_in_flight + 1);
789 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791 oa->o_grant = cli->cl_avail_grant;
792 oa->o_dropped = cli->cl_lost_grant;
793 cli->cl_lost_grant = 0;
794 client_obd_list_unlock(&cli->cl_loi_list_lock);
795 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
796 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
800 static void osc_update_next_shrink(struct client_obd *cli)
802 cli->cl_next_shrink_grant =
803 cfs_time_shift(cli->cl_grant_shrink_interval);
804 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
805 cli->cl_next_shrink_grant);
808 /* caller must hold loi_list_lock */
809 static void osc_consume_write_grant(struct client_obd *cli,
810 struct brw_page *pga)
812 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
813 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814 cfs_atomic_inc(&obd_dirty_pages);
815 cli->cl_dirty += CFS_PAGE_SIZE;
816 cli->cl_avail_grant -= CFS_PAGE_SIZE;
817 pga->flag |= OBD_BRW_FROM_GRANT;
818 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819 CFS_PAGE_SIZE, pga, pga->pg);
820 LASSERT(cli->cl_avail_grant >= 0);
821 osc_update_next_shrink(cli);
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825 * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827 struct brw_page *pga, int sent)
829 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
833 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
838 pga->flag &= ~OBD_BRW_FROM_GRANT;
839 cfs_atomic_dec(&obd_dirty_pages);
840 cli->cl_dirty -= CFS_PAGE_SIZE;
841 if (pga->flag & OBD_BRW_NOCACHE) {
842 pga->flag &= ~OBD_BRW_NOCACHE;
843 cfs_atomic_dec(&obd_dirty_transit_pages);
844 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
847 cli->cl_lost_grant += CFS_PAGE_SIZE;
848 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
849 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
850 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
851 /* For short writes we shouldn't count parts of pages that
852 * span a whole block on the OST side, or our accounting goes
853 * wrong. Should match the code in filter_grant_check. */
854 int offset = pga->off & ~CFS_PAGE_MASK;
855 int count = pga->count + (offset & (blocksize - 1));
856 int end = (offset + pga->count) & (blocksize - 1);
858 count += blocksize - end;
860 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
861 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
862 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
863 cli->cl_avail_grant, cli->cl_dirty);
869 static unsigned long rpcs_in_flight(struct client_obd *cli)
871 return cli->cl_r_in_flight + cli->cl_w_in_flight;
874 /* caller must hold loi_list_lock */
875 void osc_wake_cache_waiters(struct client_obd *cli)
878 struct osc_cache_waiter *ocw;
881 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
882 /* if we can't dirty more, we must wait until some is written */
883 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
884 (cfs_atomic_read(&obd_dirty_pages) + 1 >
885 obd_max_dirty_pages)) {
886 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887 "osc max %ld, sys max %d\n", cli->cl_dirty,
888 cli->cl_dirty_max, obd_max_dirty_pages);
892 /* if still dirty cache but no grant wait for pending RPCs that
893 * may yet return us some grant before doing sync writes */
894 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896 cli->cl_w_in_flight);
900 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
901 cfs_list_del_init(&ocw->ocw_entry);
902 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903 /* no more RPCs in flight to return grant, do sync IO */
904 ocw->ocw_rc = -EDQUOT;
905 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
907 osc_consume_write_grant(cli,
908 &ocw->ocw_oap->oap_brw_page);
911 cfs_waitq_signal(&ocw->ocw_waitq);
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
919 client_obd_list_lock(&cli->cl_loi_list_lock);
920 cli->cl_avail_grant += grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
926 if (body->oa.o_valid & OBD_MD_FLGRANT) {
927 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928 __osc_update_grant(cli, body->oa.o_grant);
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933 void *key, obd_count vallen, void *val,
934 struct ptlrpc_request_set *set);
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937 struct ptlrpc_request *req,
940 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942 struct ost_body *body;
945 __osc_update_grant(cli, oa->o_grant);
949 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
951 osc_update_grant(cli, body);
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
959 client_obd_list_lock(&cli->cl_loi_list_lock);
960 oa->o_grant = cli->cl_avail_grant / 4;
961 cli->cl_avail_grant -= oa->o_grant;
962 client_obd_list_unlock(&cli->cl_loi_list_lock);
963 oa->o_flags |= OBD_FL_SHRINK_GRANT;
964 osc_update_next_shrink(cli);
967 /* Shrink the current grant, either from some large amount to enough for a
968 * full set of in-flight RPCs, or if we have already shrunk to that limit
969 * then to enough for a single RPC. This avoids keeping more grant than
970 * needed, and avoids shrinking the grant piecemeal. */
971 static int osc_shrink_grant(struct client_obd *cli)
973 long target = (cli->cl_max_rpcs_in_flight + 1) *
974 cli->cl_max_pages_per_rpc;
976 client_obd_list_lock(&cli->cl_loi_list_lock);
977 if (cli->cl_avail_grant <= target)
978 target = cli->cl_max_pages_per_rpc;
979 client_obd_list_unlock(&cli->cl_loi_list_lock);
981 return osc_shrink_grant_to_target(cli, target);
984 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
987 struct ost_body *body;
990 client_obd_list_lock(&cli->cl_loi_list_lock);
991 /* Don't shrink if we are already above or below the desired limit
992 * We don't want to shrink below a single RPC, as that will negatively
993 * impact block allocation and long-term performance. */
994 if (target < cli->cl_max_pages_per_rpc)
995 target = cli->cl_max_pages_per_rpc;
997 if (target >= cli->cl_avail_grant) {
998 client_obd_list_unlock(&cli->cl_loi_list_lock);
1001 client_obd_list_unlock(&cli->cl_loi_list_lock);
1003 OBD_ALLOC_PTR(body);
1007 osc_announce_cached(cli, &body->oa, 0);
1009 client_obd_list_lock(&cli->cl_loi_list_lock);
1010 body->oa.o_grant = cli->cl_avail_grant - target;
1011 cli->cl_avail_grant = target;
1012 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1014 osc_update_next_shrink(cli);
1016 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1017 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1018 sizeof(*body), body, NULL);
1020 __osc_update_grant(cli, body->oa.o_grant);
1025 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1026 static int osc_should_shrink_grant(struct client_obd *client)
1028 cfs_time_t time = cfs_time_current();
1029 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1030 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1031 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1032 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1035 osc_update_next_shrink(client);
1040 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1042 struct client_obd *client;
1044 cfs_list_for_each_entry(client, &item->ti_obd_list,
1045 cl_grant_shrink_list) {
1046 if (osc_should_shrink_grant(client))
1047 osc_shrink_grant(client);
1052 static int osc_add_shrink_grant(struct client_obd *client)
1056 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1058 osc_grant_shrink_grant_cb, NULL,
1059 &client->cl_grant_shrink_list);
1061 CERROR("add grant client %s error %d\n",
1062 client->cl_import->imp_obd->obd_name, rc);
1065 CDEBUG(D_CACHE, "add grant client %s \n",
1066 client->cl_import->imp_obd->obd_name);
1067 osc_update_next_shrink(client);
1071 static int osc_del_shrink_grant(struct client_obd *client)
1073 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1080 * ocd_grant is the total grant amount we're expect to hold: if we've
1081 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1082 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1084 * race is tolerable here: if we're evicted, but imp_state already
1085 * left EVICTED state, then cl_dirty must be 0 already.
1087 client_obd_list_lock(&cli->cl_loi_list_lock);
1088 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1089 cli->cl_avail_grant = ocd->ocd_grant;
1091 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1092 client_obd_list_unlock(&cli->cl_loi_list_lock);
1094 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1095 cli->cl_avail_grant, cli->cl_lost_grant);
1096 LASSERT(cli->cl_avail_grant >= 0);
1098 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1099 cfs_list_empty(&cli->cl_grant_shrink_list))
1100 osc_add_shrink_grant(cli);
1103 /* We assume that the reason this OSC got a short read is because it read
1104 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1105 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1106 * this stripe never got written at or beyond this stripe offset yet. */
1107 static void handle_short_read(int nob_read, obd_count page_count,
1108 struct brw_page **pga)
1113 /* skip bytes read OK */
1114 while (nob_read > 0) {
1115 LASSERT (page_count > 0);
1117 if (pga[i]->count > nob_read) {
1118 /* EOF inside this page */
1119 ptr = cfs_kmap(pga[i]->pg) +
1120 (pga[i]->off & ~CFS_PAGE_MASK);
1121 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1122 cfs_kunmap(pga[i]->pg);
1128 nob_read -= pga[i]->count;
1133 /* zero remaining pages */
1134 while (page_count-- > 0) {
1135 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1136 memset(ptr, 0, pga[i]->count);
1137 cfs_kunmap(pga[i]->pg);
1142 static int check_write_rcs(struct ptlrpc_request *req,
1143 int requested_nob, int niocount,
1144 obd_count page_count, struct brw_page **pga)
1149 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1150 sizeof(*remote_rcs) *
1152 if (remote_rcs == NULL) {
1153 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1157 /* return error if any niobuf was in error */
1158 for (i = 0; i < niocount; i++) {
1159 if (remote_rcs[i] < 0)
1160 return(remote_rcs[i]);
1162 if (remote_rcs[i] != 0) {
1163 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1164 i, remote_rcs[i], req);
1169 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1170 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1171 req->rq_bulk->bd_nob_transferred, requested_nob);
1178 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1180 if (p1->flag != p2->flag) {
1181 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1182 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1184 /* warn if we try to combine flags that we don't know to be
1185 * safe to combine */
1186 if ((p1->flag & mask) != (p2->flag & mask))
1187 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1188 "same brw?\n", p1->flag, p2->flag);
1192 return (p1->off + p1->count == p2->off);
1195 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1196 struct brw_page **pga, int opc,
1197 cksum_type_t cksum_type)
1202 LASSERT (pg_count > 0);
1203 cksum = init_checksum(cksum_type);
1204 while (nob > 0 && pg_count > 0) {
1205 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1206 int off = pga[i]->off & ~CFS_PAGE_MASK;
1207 int count = pga[i]->count > nob ? nob : pga[i]->count;
1209 /* corrupt the data before we compute the checksum, to
1210 * simulate an OST->client data error */
1211 if (i == 0 && opc == OST_READ &&
1212 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1213 memcpy(ptr + off, "bad1", min(4, nob));
1214 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1215 cfs_kunmap(pga[i]->pg);
1216 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1219 nob -= pga[i]->count;
1223 /* For sending we only compute the wrong checksum instead
1224 * of corrupting the data so it is still correct on a redo */
1225 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1232 struct lov_stripe_md *lsm, obd_count page_count,
1233 struct brw_page **pga,
1234 struct ptlrpc_request **reqp,
1235 struct obd_capa *ocapa, int reserve)
1237 struct ptlrpc_request *req;
1238 struct ptlrpc_bulk_desc *desc;
1239 struct ost_body *body;
1240 struct obd_ioobj *ioobj;
1241 struct niobuf_remote *niobuf;
1242 int niocount, i, requested_nob, opc, rc;
1243 struct osc_brw_async_args *aa;
1244 struct req_capsule *pill;
1245 struct brw_page *pg_prev;
1248 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1249 RETURN(-ENOMEM); /* Recoverable */
1250 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1251 RETURN(-EINVAL); /* Fatal */
1253 if ((cmd & OBD_BRW_WRITE) != 0) {
1255 req = ptlrpc_request_alloc_pool(cli->cl_import,
1256 cli->cl_import->imp_rq_pool,
1257 &RQF_OST_BRW_WRITE);
1260 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1265 for (niocount = i = 1; i < page_count; i++) {
1266 if (!can_merge_pages(pga[i - 1], pga[i]))
1270 pill = &req->rq_pill;
1271 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1273 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1274 niocount * sizeof(*niobuf));
1275 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1279 ptlrpc_request_free(req);
1282 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1283 ptlrpc_at_set_req_timeout(req);
1285 if (opc == OST_WRITE)
1286 desc = ptlrpc_prep_bulk_imp(req, page_count,
1287 BULK_GET_SOURCE, OST_BULK_PORTAL);
1289 desc = ptlrpc_prep_bulk_imp(req, page_count,
1290 BULK_PUT_SINK, OST_BULK_PORTAL);
1293 GOTO(out, rc = -ENOMEM);
1294 /* NB request now owns desc and will free it when it gets freed */
1296 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1297 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1298 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1299 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1301 lustre_set_wire_obdo(&body->oa, oa);
1303 obdo_to_ioobj(oa, ioobj);
1304 ioobj->ioo_bufcnt = niocount;
1305 osc_pack_capa(req, body, ocapa);
1306 LASSERT (page_count > 0);
1308 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309 struct brw_page *pg = pga[i];
1311 LASSERT(pg->count > 0);
1312 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1313 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1314 pg->off, pg->count);
1316 LASSERTF(i == 0 || pg->off > pg_prev->off,
1317 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1318 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1320 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1321 pg_prev->pg, page_private(pg_prev->pg),
1322 pg_prev->pg->index, pg_prev->off);
1324 LASSERTF(i == 0 || pg->off > pg_prev->off,
1325 "i %d p_c %u\n", i, page_count);
1327 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1328 (pg->flag & OBD_BRW_SRVLOCK));
1330 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1332 requested_nob += pg->count;
1334 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1336 niobuf->len += pg->count;
1338 niobuf->offset = pg->off;
1339 niobuf->len = pg->count;
1340 niobuf->flags = pg->flag;
1345 LASSERTF((void *)(niobuf - niocount) ==
1346 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1350 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1351 if (osc_should_shrink_grant(cli))
1352 osc_shrink_grant_local(cli, &body->oa);
1354 /* size[REQ_REC_OFF] still sizeof (*body) */
1355 if (opc == OST_WRITE) {
1356 if (unlikely(cli->cl_checksum) &&
1357 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1358 /* store cl_cksum_type in a local variable since
1359 * it can be changed via lprocfs */
1360 cksum_type_t cksum_type = cli->cl_cksum_type;
1362 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1363 oa->o_flags &= OBD_FL_LOCAL_MASK;
1364 body->oa.o_flags = 0;
1366 body->oa.o_flags |= cksum_type_pack(cksum_type);
1367 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1368 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1372 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1374 /* save this in 'oa', too, for later checking */
1375 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376 oa->o_flags |= cksum_type_pack(cksum_type);
1378 /* clear out the checksum flag, in case this is a
1379 * resend but cl_checksum is no longer set. b=11238 */
1380 oa->o_valid &= ~OBD_MD_FLCKSUM;
1382 oa->o_cksum = body->oa.o_cksum;
1383 /* 1 RC per niobuf */
1384 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1385 sizeof(__u32) * niocount);
1387 if (unlikely(cli->cl_checksum) &&
1388 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1390 body->oa.o_flags = 0;
1391 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1392 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395 ptlrpc_request_set_replen(req);
1397 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1398 aa = ptlrpc_req_async_args(req);
1400 aa->aa_requested_nob = requested_nob;
1401 aa->aa_nio_count = niocount;
1402 aa->aa_page_count = page_count;
1406 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1407 if (ocapa && reserve)
1408 aa->aa_ocapa = capa_get(ocapa);
1414 ptlrpc_req_finished(req);
1418 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1419 __u32 client_cksum, __u32 server_cksum, int nob,
1420 obd_count page_count, struct brw_page **pga,
1421 cksum_type_t client_cksum_type)
1425 cksum_type_t cksum_type;
1427 if (server_cksum == client_cksum) {
1428 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1432 if (oa->o_valid & OBD_MD_FLFLAGS)
1433 cksum_type = cksum_type_unpack(oa->o_flags);
1435 cksum_type = OBD_CKSUM_CRC32;
1437 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440 if (cksum_type != client_cksum_type)
1441 msg = "the server did not use the checksum type specified in "
1442 "the original request - likely a protocol problem";
1443 else if (new_cksum == server_cksum)
1444 msg = "changed on the client after we checksummed it - "
1445 "likely false positive due to mmap IO (bug 11742)";
1446 else if (new_cksum == client_cksum)
1447 msg = "changed in transit before arrival at OST";
1449 msg = "changed in transit AND doesn't match the original - "
1450 "likely false positive due to mmap IO (bug 11742)";
1452 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1453 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1454 "["LPU64"-"LPU64"]\n",
1455 msg, libcfs_nid2str(peer->nid),
1456 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1457 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1460 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1462 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1463 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1464 "client csum now %x\n", client_cksum, client_cksum_type,
1465 server_cksum, cksum_type, new_cksum);
1469 /* Note rc enters this function as number of bytes transferred */
1470 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1472 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1473 const lnet_process_id_t *peer =
1474 &req->rq_import->imp_connection->c_peer;
1475 struct client_obd *cli = aa->aa_cli;
1476 struct ost_body *body;
1477 __u32 client_cksum = 0;
1480 if (rc < 0 && rc != -EDQUOT)
1483 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1484 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1486 CDEBUG(D_INFO, "Can't unpack body\n");
1490 /* set/clear over quota flag for a uid/gid */
1491 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1492 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1493 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1495 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1502 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1503 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1505 osc_update_grant(cli, body);
1507 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1509 CERROR("Unexpected +ve rc %d\n", rc);
1512 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1514 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1518 check_write_checksum(&body->oa, peer, client_cksum,
1519 body->oa.o_cksum, aa->aa_requested_nob,
1520 aa->aa_page_count, aa->aa_ppga,
1521 cksum_type_unpack(aa->aa_oa->o_flags)))
1524 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1525 aa->aa_page_count, aa->aa_ppga);
1529 /* The rest of this function executes only for OST_READs */
1531 /* if unwrap_bulk failed, return -EAGAIN to retry */
1532 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1534 GOTO(out, rc = -EAGAIN);
1536 if (rc > aa->aa_requested_nob) {
1537 CERROR("Unexpected rc %d (%d requested)\n", rc,
1538 aa->aa_requested_nob);
1542 if (rc != req->rq_bulk->bd_nob_transferred) {
1543 CERROR ("Unexpected rc %d (%d transferred)\n",
1544 rc, req->rq_bulk->bd_nob_transferred);
1548 if (rc < aa->aa_requested_nob)
1549 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1551 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1552 static int cksum_counter;
1553 __u32 server_cksum = body->oa.o_cksum;
1556 cksum_type_t cksum_type;
1558 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1559 cksum_type = cksum_type_unpack(body->oa.o_flags);
1561 cksum_type = OBD_CKSUM_CRC32;
1562 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1563 aa->aa_ppga, OST_READ,
1566 if (peer->nid == req->rq_bulk->bd_sender) {
1570 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1573 if (server_cksum == ~0 && rc > 0) {
1574 CERROR("Protocol error: server %s set the 'checksum' "
1575 "bit, but didn't send a checksum. Not fatal, "
1576 "but please notify on http://bugzilla.lustre.org/\n",
1577 libcfs_nid2str(peer->nid));
1578 } else if (server_cksum != client_cksum) {
1579 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1580 "%s%s%s inum "LPU64"/"LPU64" object "
1581 LPU64"/"LPU64" extent "
1582 "["LPU64"-"LPU64"]\n",
1583 req->rq_import->imp_obd->obd_name,
1584 libcfs_nid2str(peer->nid),
1586 body->oa.o_valid & OBD_MD_FLFID ?
1587 body->oa.o_fid : (__u64)0,
1588 body->oa.o_valid & OBD_MD_FLFID ?
1589 body->oa.o_generation :(__u64)0,
1591 body->oa.o_valid & OBD_MD_FLGROUP ?
1592 body->oa.o_gr : (__u64)0,
1593 aa->aa_ppga[0]->off,
1594 aa->aa_ppga[aa->aa_page_count-1]->off +
1595 aa->aa_ppga[aa->aa_page_count-1]->count -
1597 CERROR("client %x, server %x, cksum_type %x\n",
1598 client_cksum, server_cksum, cksum_type);
1600 aa->aa_oa->o_cksum = client_cksum;
1604 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1607 } else if (unlikely(client_cksum)) {
1608 static int cksum_missed;
1611 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1612 CERROR("Checksum %u requested from %s but not sent\n",
1613 cksum_missed, libcfs_nid2str(peer->nid));
1619 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1624 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1625 struct lov_stripe_md *lsm,
1626 obd_count page_count, struct brw_page **pga,
1627 struct obd_capa *ocapa)
1629 struct ptlrpc_request *req;
1633 struct l_wait_info lwi;
1637 cfs_waitq_init(&waitq);
1640 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1641 page_count, pga, &req, ocapa, 0);
1645 rc = ptlrpc_queue_wait(req);
1647 if (rc == -ETIMEDOUT && req->rq_resend) {
1648 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1649 ptlrpc_req_finished(req);
1653 rc = osc_brw_fini_request(req, rc);
1655 ptlrpc_req_finished(req);
1656 if (osc_recoverable_error(rc)) {
1658 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1659 CERROR("too many resend retries, returning error\n");
1663 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1664 l_wait_event(waitq, 0, &lwi);
1672 int osc_brw_redo_request(struct ptlrpc_request *request,
1673 struct osc_brw_async_args *aa)
1675 struct ptlrpc_request *new_req;
1676 struct ptlrpc_request_set *set = request->rq_set;
1677 struct osc_brw_async_args *new_aa;
1678 struct osc_async_page *oap;
1682 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1683 CERROR("too many resend retries, returning error\n");
1687 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1689 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1690 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1691 aa->aa_cli, aa->aa_oa,
1692 NULL /* lsm unused by osc currently */,
1693 aa->aa_page_count, aa->aa_ppga,
1694 &new_req, aa->aa_ocapa, 0);
1698 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1700 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1701 if (oap->oap_request != NULL) {
1702 LASSERTF(request == oap->oap_request,
1703 "request %p != oap_request %p\n",
1704 request, oap->oap_request);
1705 if (oap->oap_interrupted) {
1706 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1707 ptlrpc_req_finished(new_req);
1712 /* New request takes over pga and oaps from old request.
1713 * Note that copying a list_head doesn't work, need to move it... */
1715 new_req->rq_interpret_reply = request->rq_interpret_reply;
1716 new_req->rq_async_args = request->rq_async_args;
1717 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1719 new_aa = ptlrpc_req_async_args(new_req);
1721 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1722 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1723 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1725 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1726 if (oap->oap_request) {
1727 ptlrpc_req_finished(oap->oap_request);
1728 oap->oap_request = ptlrpc_request_addref(new_req);
1732 new_aa->aa_ocapa = aa->aa_ocapa;
1733 aa->aa_ocapa = NULL;
1735 /* use ptlrpc_set_add_req is safe because interpret functions work
1736 * in check_set context. only one way exist with access to request
1737 * from different thread got -EINTR - this way protected with
1738 * cl_loi_list_lock */
1739 ptlrpc_set_add_req(set, new_req);
1741 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1743 DEBUG_REQ(D_INFO, new_req, "new request");
1748 * ugh, we want disk allocation on the target to happen in offset order. we'll
1749 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1750 * fine for our small page arrays and doesn't require allocation. its an
1751 * insertion sort that swaps elements that are strides apart, shrinking the
1752 * stride down until its '1' and the array is sorted.
1754 static void sort_brw_pages(struct brw_page **array, int num)
1757 struct brw_page *tmp;
1761 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1766 for (i = stride ; i < num ; i++) {
1769 while (j >= stride && array[j - stride]->off > tmp->off) {
1770 array[j] = array[j - stride];
1775 } while (stride > 1);
1778 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1784 LASSERT (pages > 0);
1785 offset = pg[i]->off & ~CFS_PAGE_MASK;
1789 if (pages == 0) /* that's all */
1792 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1793 return count; /* doesn't end on page boundary */
1796 offset = pg[i]->off & ~CFS_PAGE_MASK;
1797 if (offset != 0) /* doesn't start on page boundary */
1804 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1806 struct brw_page **ppga;
1809 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1813 for (i = 0; i < count; i++)
1818 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1820 LASSERT(ppga != NULL);
1821 OBD_FREE(ppga, sizeof(*ppga) * count);
1824 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1825 obd_count page_count, struct brw_page *pga,
1826 struct obd_trans_info *oti)
1828 struct obdo *saved_oa = NULL;
1829 struct brw_page **ppga, **orig;
1830 struct obd_import *imp = class_exp2cliimp(exp);
1831 struct client_obd *cli;
1832 int rc, page_count_orig;
1835 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1836 cli = &imp->imp_obd->u.cli;
1838 if (cmd & OBD_BRW_CHECK) {
1839 /* The caller just wants to know if there's a chance that this
1840 * I/O can succeed */
1842 if (imp->imp_invalid)
1847 /* test_brw with a failed create can trip this, maybe others. */
1848 LASSERT(cli->cl_max_pages_per_rpc);
1852 orig = ppga = osc_build_ppga(pga, page_count);
1855 page_count_orig = page_count;
1857 sort_brw_pages(ppga, page_count);
1858 while (page_count) {
1859 obd_count pages_per_brw;
1861 if (page_count > cli->cl_max_pages_per_rpc)
1862 pages_per_brw = cli->cl_max_pages_per_rpc;
1864 pages_per_brw = page_count;
1866 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1868 if (saved_oa != NULL) {
1869 /* restore previously saved oa */
1870 *oinfo->oi_oa = *saved_oa;
1871 } else if (page_count > pages_per_brw) {
1872 /* save a copy of oa (brw will clobber it) */
1873 OBDO_ALLOC(saved_oa);
1874 if (saved_oa == NULL)
1875 GOTO(out, rc = -ENOMEM);
1876 *saved_oa = *oinfo->oi_oa;
1879 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1880 pages_per_brw, ppga, oinfo->oi_capa);
1885 page_count -= pages_per_brw;
1886 ppga += pages_per_brw;
1890 osc_release_ppga(orig, page_count_orig);
1892 if (saved_oa != NULL)
1893 OBDO_FREE(saved_oa);
1898 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1899 * the dirty accounting. Writeback completes or truncate happens before
1900 * writing starts. Must be called with the loi lock held. */
1901 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1904 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1908 /* This maintains the lists of pending pages to read/write for a given object
1909 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1910 * to quickly find objects that are ready to send an RPC. */
1911 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1917 if (lop->lop_num_pending == 0)
1920 /* if we have an invalid import we want to drain the queued pages
1921 * by forcing them through rpcs that immediately fail and complete
1922 * the pages. recovery relies on this to empty the queued pages
1923 * before canceling the locks and evicting down the llite pages */
1924 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1927 /* stream rpcs in queue order as long as as there is an urgent page
1928 * queued. this is our cheap solution for good batching in the case
1929 * where writepage marks some random page in the middle of the file
1930 * as urgent because of, say, memory pressure */
1931 if (!cfs_list_empty(&lop->lop_urgent)) {
1932 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1935 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1936 optimal = cli->cl_max_pages_per_rpc;
1937 if (cmd & OBD_BRW_WRITE) {
1938 /* trigger a write rpc stream as long as there are dirtiers
1939 * waiting for space. as they're waiting, they're not going to
1940 * create more pages to coallesce with what's waiting.. */
1941 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1942 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1945 /* +16 to avoid triggering rpcs that would want to include pages
1946 * that are being queued but which can't be made ready until
1947 * the queuer finishes with the page. this is a wart for
1948 * llite::commit_write() */
1951 if (lop->lop_num_pending >= optimal)
1957 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1959 struct osc_async_page *oap;
1962 if (cfs_list_empty(&lop->lop_urgent))
1965 oap = cfs_list_entry(lop->lop_urgent.next,
1966 struct osc_async_page, oap_urgent_item);
1968 if (oap->oap_async_flags & ASYNC_HP) {
1969 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1976 static void on_list(cfs_list_t *item, cfs_list_t *list,
1979 if (cfs_list_empty(item) && should_be_on)
1980 cfs_list_add_tail(item, list);
1981 else if (!cfs_list_empty(item) && !should_be_on)
1982 cfs_list_del_init(item);
1985 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1986 * can find pages to build into rpcs quickly */
1987 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1989 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1990 lop_makes_hprpc(&loi->loi_read_lop)) {
1992 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1993 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1995 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1996 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1997 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1998 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2001 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2002 loi->loi_write_lop.lop_num_pending);
2004 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2005 loi->loi_read_lop.lop_num_pending);
2008 static void lop_update_pending(struct client_obd *cli,
2009 struct loi_oap_pages *lop, int cmd, int delta)
2011 lop->lop_num_pending += delta;
2012 if (cmd & OBD_BRW_WRITE)
2013 cli->cl_pending_w_pages += delta;
2015 cli->cl_pending_r_pages += delta;
2019 * this is called when a sync waiter receives an interruption. Its job is to
2020 * get the caller woken as soon as possible. If its page hasn't been put in an
2021 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2022 * desiring interruption which will forcefully complete the rpc once the rpc
2025 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2027 struct loi_oap_pages *lop;
2028 struct lov_oinfo *loi;
2032 LASSERT(!oap->oap_interrupted);
2033 oap->oap_interrupted = 1;
2035 /* ok, it's been put in an rpc. only one oap gets a request reference */
2036 if (oap->oap_request != NULL) {
2037 ptlrpc_mark_interrupted(oap->oap_request);
2038 ptlrpcd_wake(oap->oap_request);
2039 ptlrpc_req_finished(oap->oap_request);
2040 oap->oap_request = NULL;
2044 * page completion may be called only if ->cpo_prep() method was
2045 * executed by osc_io_submit(), that also adds page the to pending list
2047 if (!cfs_list_empty(&oap->oap_pending_item)) {
2048 cfs_list_del_init(&oap->oap_pending_item);
2049 cfs_list_del_init(&oap->oap_urgent_item);
2052 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2053 &loi->loi_write_lop : &loi->loi_read_lop;
2054 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2055 loi_list_maint(oap->oap_cli, oap->oap_loi);
2056 rc = oap->oap_caller_ops->ap_completion(env,
2057 oap->oap_caller_data,
2058 oap->oap_cmd, NULL, -EINTR);
2064 /* this is trying to propogate async writeback errors back up to the
2065 * application. As an async write fails we record the error code for later if
2066 * the app does an fsync. As long as errors persist we force future rpcs to be
2067 * sync so that the app can get a sync error and break the cycle of queueing
2068 * pages for which writeback will fail. */
2069 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2076 ar->ar_force_sync = 1;
2077 ar->ar_min_xid = ptlrpc_sample_next_xid();
2082 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2083 ar->ar_force_sync = 0;
2086 void osc_oap_to_pending(struct osc_async_page *oap)
2088 struct loi_oap_pages *lop;
2090 if (oap->oap_cmd & OBD_BRW_WRITE)
2091 lop = &oap->oap_loi->loi_write_lop;
2093 lop = &oap->oap_loi->loi_read_lop;
2095 if (oap->oap_async_flags & ASYNC_HP)
2096 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2097 else if (oap->oap_async_flags & ASYNC_URGENT)
2098 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2099 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2100 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2103 /* this must be called holding the loi list lock to give coverage to exit_cache,
2104 * async_flag maintenance, and oap_request */
2105 static void osc_ap_completion(const struct lu_env *env,
2106 struct client_obd *cli, struct obdo *oa,
2107 struct osc_async_page *oap, int sent, int rc)
2112 if (oap->oap_request != NULL) {
2113 xid = ptlrpc_req_xid(oap->oap_request);
2114 ptlrpc_req_finished(oap->oap_request);
2115 oap->oap_request = NULL;
2118 cfs_spin_lock(&oap->oap_lock);
2119 oap->oap_async_flags = 0;
2120 cfs_spin_unlock(&oap->oap_lock);
2121 oap->oap_interrupted = 0;
2123 if (oap->oap_cmd & OBD_BRW_WRITE) {
2124 osc_process_ar(&cli->cl_ar, xid, rc);
2125 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2128 if (rc == 0 && oa != NULL) {
2129 if (oa->o_valid & OBD_MD_FLBLOCKS)
2130 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2131 if (oa->o_valid & OBD_MD_FLMTIME)
2132 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2133 if (oa->o_valid & OBD_MD_FLATIME)
2134 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2135 if (oa->o_valid & OBD_MD_FLCTIME)
2136 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2139 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2140 oap->oap_cmd, oa, rc);
2142 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2143 * I/O on the page could start, but OSC calls it under lock
2144 * and thus we can add oap back to pending safely */
2146 /* upper layer wants to leave the page on pending queue */
2147 osc_oap_to_pending(oap);
2149 osc_exit_cache(cli, oap, sent);
2153 static int brw_interpret(const struct lu_env *env,
2154 struct ptlrpc_request *req, void *data, int rc)
2156 struct osc_brw_async_args *aa = data;
2157 struct client_obd *cli;
2161 rc = osc_brw_fini_request(req, rc);
2162 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2163 if (osc_recoverable_error(rc)) {
2164 rc = osc_brw_redo_request(req, aa);
2170 capa_put(aa->aa_ocapa);
2171 aa->aa_ocapa = NULL;
2176 client_obd_list_lock(&cli->cl_loi_list_lock);
2178 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2179 * is called so we know whether to go to sync BRWs or wait for more
2180 * RPCs to complete */
2181 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2182 cli->cl_w_in_flight--;
2184 cli->cl_r_in_flight--;
2186 async = cfs_list_empty(&aa->aa_oaps);
2187 if (!async) { /* from osc_send_oap_rpc() */
2188 struct osc_async_page *oap, *tmp;
2189 /* the caller may re-use the oap after the completion call so
2190 * we need to clean it up a little */
2191 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2193 cfs_list_del_init(&oap->oap_rpc_item);
2194 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2196 OBDO_FREE(aa->aa_oa);
2197 } else { /* from async_internal() */
2199 for (i = 0; i < aa->aa_page_count; i++)
2200 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2202 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2203 OBDO_FREE(aa->aa_oa);
2205 osc_wake_cache_waiters(cli);
2206 osc_check_rpcs(env, cli);
2207 client_obd_list_unlock(&cli->cl_loi_list_lock);
2209 cl_req_completion(env, aa->aa_clerq, rc);
2210 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2214 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2215 struct client_obd *cli,
2216 cfs_list_t *rpc_list,
2217 int page_count, int cmd)
2219 struct ptlrpc_request *req;
2220 struct brw_page **pga = NULL;
2221 struct osc_brw_async_args *aa;
2222 struct obdo *oa = NULL;
2223 const struct obd_async_page_ops *ops = NULL;
2224 void *caller_data = NULL;
2225 struct osc_async_page *oap;
2226 struct osc_async_page *tmp;
2227 struct ost_body *body;
2228 struct cl_req *clerq = NULL;
2229 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2230 struct ldlm_lock *lock = NULL;
2231 struct cl_req_attr crattr;
2235 LASSERT(!cfs_list_empty(rpc_list));
2237 memset(&crattr, 0, sizeof crattr);
2238 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2240 GOTO(out, req = ERR_PTR(-ENOMEM));
2244 GOTO(out, req = ERR_PTR(-ENOMEM));
2247 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2248 struct cl_page *page = osc_oap2cl_page(oap);
2250 ops = oap->oap_caller_ops;
2251 caller_data = oap->oap_caller_data;
2253 clerq = cl_req_alloc(env, page, crt,
2254 1 /* only 1-object rpcs for
2257 GOTO(out, req = (void *)clerq);
2258 lock = oap->oap_ldlm_lock;
2260 pga[i] = &oap->oap_brw_page;
2261 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2262 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2263 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2265 cl_req_page_add(env, clerq, page);
2268 /* always get the data for the obdo for the rpc */
2269 LASSERT(ops != NULL);
2271 crattr.cra_capa = NULL;
2272 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2274 oa->o_handle = lock->l_remote_handle;
2275 oa->o_valid |= OBD_MD_FLHANDLE;
2278 rc = cl_req_prep(env, clerq);
2280 CERROR("cl_req_prep failed: %d\n", rc);
2281 GOTO(out, req = ERR_PTR(rc));
2284 sort_brw_pages(pga, page_count);
2285 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2286 pga, &req, crattr.cra_capa, 1);
2288 CERROR("prep_req failed: %d\n", rc);
2289 GOTO(out, req = ERR_PTR(rc));
2292 /* Need to update the timestamps after the request is built in case
2293 * we race with setattr (locally or in queue at OST). If OST gets
2294 * later setattr before earlier BRW (as determined by the request xid),
2295 * the OST will not use BRW timestamps. Sadly, there is no obvious
2296 * way to do this in a single call. bug 10150 */
2297 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2298 cl_req_attr_set(env, clerq, &crattr,
2299 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2301 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2302 aa = ptlrpc_req_async_args(req);
2303 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2304 cfs_list_splice(rpc_list, &aa->aa_oaps);
2305 CFS_INIT_LIST_HEAD(rpc_list);
2306 aa->aa_clerq = clerq;
2308 capa_put(crattr.cra_capa);
2313 OBD_FREE(pga, sizeof(*pga) * page_count);
2314 /* this should happen rarely and is pretty bad, it makes the
2315 * pending list not follow the dirty order */
2316 client_obd_list_lock(&cli->cl_loi_list_lock);
2317 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2318 cfs_list_del_init(&oap->oap_rpc_item);
2320 /* queued sync pages can be torn down while the pages
2321 * were between the pending list and the rpc */
2322 if (oap->oap_interrupted) {
2323 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2324 osc_ap_completion(env, cli, NULL, oap, 0,
2328 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2330 if (clerq && !IS_ERR(clerq))
2331 cl_req_completion(env, clerq, PTR_ERR(req));
2337 * prepare pages for ASYNC io and put pages in send queue.
2339 * \param cmd OBD_BRW_* macroses
2340 * \param lop pending pages
2342 * \return zero if pages successfully add to send queue.
2343 * \return not zere if error occurring.
2346 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2347 struct lov_oinfo *loi,
2348 int cmd, struct loi_oap_pages *lop)
2350 struct ptlrpc_request *req;
2351 obd_count page_count = 0;
2352 struct osc_async_page *oap = NULL, *tmp;
2353 struct osc_brw_async_args *aa;
2354 const struct obd_async_page_ops *ops;
2355 CFS_LIST_HEAD(rpc_list);
2356 CFS_LIST_HEAD(tmp_list);
2357 unsigned int ending_offset;
2358 unsigned starting_offset = 0;
2360 struct cl_object *clob = NULL;
2363 /* ASYNC_HP pages first. At present, when the lock the pages is
2364 * to be canceled, the pages covered by the lock will be sent out
2365 * with ASYNC_HP. We have to send out them as soon as possible. */
2366 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2367 if (oap->oap_async_flags & ASYNC_HP)
2368 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2370 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2371 if (++page_count >= cli->cl_max_pages_per_rpc)
2375 cfs_list_splice(&tmp_list, &lop->lop_pending);
2378 /* first we find the pages we're allowed to work with */
2379 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2381 ops = oap->oap_caller_ops;
2383 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2384 "magic 0x%x\n", oap, oap->oap_magic);
2387 /* pin object in memory, so that completion call-backs
2388 * can be safely called under client_obd_list lock. */
2389 clob = osc_oap2cl_page(oap)->cp_obj;
2390 cl_object_get(clob);
2393 if (page_count != 0 &&
2394 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2395 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2396 " oap %p, page %p, srvlock %u\n",
2397 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2401 /* If there is a gap at the start of this page, it can't merge
2402 * with any previous page, so we'll hand the network a
2403 * "fragmented" page array that it can't transfer in 1 RDMA */
2404 if (page_count != 0 && oap->oap_page_off != 0)
2407 /* in llite being 'ready' equates to the page being locked
2408 * until completion unlocks it. commit_write submits a page
2409 * as not ready because its unlock will happen unconditionally
2410 * as the call returns. if we race with commit_write giving
2411 * us that page we dont' want to create a hole in the page
2412 * stream, so we stop and leave the rpc to be fired by
2413 * another dirtier or kupdated interval (the not ready page
2414 * will still be on the dirty list). we could call in
2415 * at the end of ll_file_write to process the queue again. */
2416 if (!(oap->oap_async_flags & ASYNC_READY)) {
2417 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2420 CDEBUG(D_INODE, "oap %p page %p returned %d "
2421 "instead of ready\n", oap,
2425 /* llite is telling us that the page is still
2426 * in commit_write and that we should try
2427 * and put it in an rpc again later. we
2428 * break out of the loop so we don't create
2429 * a hole in the sequence of pages in the rpc
2434 /* the io isn't needed.. tell the checks
2435 * below to complete the rpc with EINTR */
2436 cfs_spin_lock(&oap->oap_lock);
2437 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2438 cfs_spin_unlock(&oap->oap_lock);
2439 oap->oap_count = -EINTR;
2442 cfs_spin_lock(&oap->oap_lock);
2443 oap->oap_async_flags |= ASYNC_READY;
2444 cfs_spin_unlock(&oap->oap_lock);
2447 LASSERTF(0, "oap %p page %p returned %d "
2448 "from make_ready\n", oap,
2456 * Page submitted for IO has to be locked. Either by
2457 * ->ap_make_ready() or by higher layers.
2459 #if defined(__KERNEL__) && defined(__linux__)
2461 struct cl_page *page;
2463 page = osc_oap2cl_page(oap);
2465 if (page->cp_type == CPT_CACHEABLE &&
2466 !(PageLocked(oap->oap_page) &&
2467 (CheckWriteback(oap->oap_page, cmd)))) {
2468 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2470 (long)oap->oap_page->flags,
2471 oap->oap_async_flags);
2477 /* take the page out of our book-keeping */
2478 cfs_list_del_init(&oap->oap_pending_item);
2479 lop_update_pending(cli, lop, cmd, -1);
2480 cfs_list_del_init(&oap->oap_urgent_item);
2482 if (page_count == 0)
2483 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2484 (PTLRPC_MAX_BRW_SIZE - 1);
2486 /* ask the caller for the size of the io as the rpc leaves. */
2487 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2489 ops->ap_refresh_count(env, oap->oap_caller_data,
2491 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2493 if (oap->oap_count <= 0) {
2494 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2496 osc_ap_completion(env, cli, NULL,
2497 oap, 0, oap->oap_count);
2501 /* now put the page back in our accounting */
2502 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2503 if (page_count == 0)
2504 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2505 if (++page_count >= cli->cl_max_pages_per_rpc)
2508 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2509 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2510 * have the same alignment as the initial writes that allocated
2511 * extents on the server. */
2512 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2513 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2514 if (ending_offset == 0)
2517 /* If there is a gap at the end of this page, it can't merge
2518 * with any subsequent pages, so we'll hand the network a
2519 * "fragmented" page array that it can't transfer in 1 RDMA */
2520 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2524 osc_wake_cache_waiters(cli);
2526 loi_list_maint(cli, loi);
2528 client_obd_list_unlock(&cli->cl_loi_list_lock);
2531 cl_object_put(env, clob);
2533 if (page_count == 0) {
2534 client_obd_list_lock(&cli->cl_loi_list_lock);
2538 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2540 LASSERT(cfs_list_empty(&rpc_list));
2541 loi_list_maint(cli, loi);
2542 RETURN(PTR_ERR(req));
2545 aa = ptlrpc_req_async_args(req);
2547 if (cmd == OBD_BRW_READ) {
2548 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2549 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2550 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2551 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2553 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2554 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2555 cli->cl_w_in_flight);
2556 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2557 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2559 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2561 client_obd_list_lock(&cli->cl_loi_list_lock);
2563 if (cmd == OBD_BRW_READ)
2564 cli->cl_r_in_flight++;
2566 cli->cl_w_in_flight++;
2568 /* queued sync pages can be torn down while the pages
2569 * were between the pending list and the rpc */
2571 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2572 /* only one oap gets a request reference */
2575 if (oap->oap_interrupted && !req->rq_intr) {
2576 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2578 ptlrpc_mark_interrupted(req);
2582 tmp->oap_request = ptlrpc_request_addref(req);
2584 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2585 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2587 req->rq_interpret_reply = brw_interpret;
2588 ptlrpcd_add_req(req, PSCOPE_BRW);
2592 #define LOI_DEBUG(LOI, STR, args...) \
2593 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2594 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2595 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2596 (LOI)->loi_write_lop.lop_num_pending, \
2597 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2598 (LOI)->loi_read_lop.lop_num_pending, \
2599 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2602 /* This is called by osc_check_rpcs() to find which objects have pages that
2603 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2604 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2608 /* First return objects that have blocked locks so that they
2609 * will be flushed quickly and other clients can get the lock,
2610 * then objects which have pages ready to be stuffed into RPCs */
2611 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2612 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2613 struct lov_oinfo, loi_hp_ready_item));
2614 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2615 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2616 struct lov_oinfo, loi_ready_item));
2618 /* then if we have cache waiters, return all objects with queued
2619 * writes. This is especially important when many small files
2620 * have filled up the cache and not been fired into rpcs because
2621 * they don't pass the nr_pending/object threshhold */
2622 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2623 !cfs_list_empty(&cli->cl_loi_write_list))
2624 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2625 struct lov_oinfo, loi_write_item));
2627 /* then return all queued objects when we have an invalid import
2628 * so that they get flushed */
2629 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2630 if (!cfs_list_empty(&cli->cl_loi_write_list))
2631 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2634 if (!cfs_list_empty(&cli->cl_loi_read_list))
2635 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2636 struct lov_oinfo, loi_read_item));
2641 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2643 struct osc_async_page *oap;
2646 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2647 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2648 struct osc_async_page, oap_urgent_item);
2649 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2652 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2653 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2654 struct osc_async_page, oap_urgent_item);
2655 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2658 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2661 /* called with the loi list lock held */
2662 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2664 struct lov_oinfo *loi;
2665 int rc = 0, race_counter = 0;
2668 while ((loi = osc_next_loi(cli)) != NULL) {
2669 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2671 if (osc_max_rpc_in_flight(cli, loi))
2674 /* attempt some read/write balancing by alternating between
2675 * reads and writes in an object. The makes_rpc checks here
2676 * would be redundant if we were getting read/write work items
2677 * instead of objects. we don't want send_oap_rpc to drain a
2678 * partial read pending queue when we're given this object to
2679 * do io on writes while there are cache waiters */
2680 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2681 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2682 &loi->loi_write_lop);
2684 CERROR("Write request failed with %d\n", rc);
2686 /* osc_send_oap_rpc failed, mostly because of
2689 * It can't break here, because if:
2690 * - a page was submitted by osc_io_submit, so
2692 * - no request in flight
2693 * - no subsequent request
2694 * The system will be in live-lock state,
2695 * because there is no chance to call
2696 * osc_io_unplug() and osc_check_rpcs() any
2697 * more. pdflush can't help in this case,
2698 * because it might be blocked at grabbing
2699 * the page lock as we mentioned.
2701 * Anyway, continue to drain pages. */
2710 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2711 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2712 &loi->loi_read_lop);
2714 CERROR("Read request failed with %d\n", rc);
2722 /* attempt some inter-object balancing by issueing rpcs
2723 * for each object in turn */
2724 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2725 cfs_list_del_init(&loi->loi_hp_ready_item);
2726 if (!cfs_list_empty(&loi->loi_ready_item))
2727 cfs_list_del_init(&loi->loi_ready_item);
2728 if (!cfs_list_empty(&loi->loi_write_item))
2729 cfs_list_del_init(&loi->loi_write_item);
2730 if (!cfs_list_empty(&loi->loi_read_item))
2731 cfs_list_del_init(&loi->loi_read_item);
2733 loi_list_maint(cli, loi);
2735 /* send_oap_rpc fails with 0 when make_ready tells it to
2736 * back off. llite's make_ready does this when it tries
2737 * to lock a page queued for write that is already locked.
2738 * we want to try sending rpcs from many objects, but we
2739 * don't want to spin failing with 0. */
2740 if (race_counter == 10)
2746 /* we're trying to queue a page in the osc so we're subject to the
2747 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2748 * If the osc's queued pages are already at that limit, then we want to sleep
2749 * until there is space in the osc's queue for us. We also may be waiting for
2750 * write credits from the OST if there are RPCs in flight that may return some
2751 * before we fall back to sync writes.
2753 * We need this know our allocation was granted in the presence of signals */
2754 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2758 client_obd_list_lock(&cli->cl_loi_list_lock);
2759 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2760 client_obd_list_unlock(&cli->cl_loi_list_lock);
2765 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2768 int osc_enter_cache_try(const struct lu_env *env,
2769 struct client_obd *cli, struct lov_oinfo *loi,
2770 struct osc_async_page *oap, int transient)
2774 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2776 osc_consume_write_grant(cli, &oap->oap_brw_page);
2778 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2779 cfs_atomic_inc(&obd_dirty_transit_pages);
2780 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2786 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2787 * grant or cache space. */
2788 static int osc_enter_cache(const struct lu_env *env,
2789 struct client_obd *cli, struct lov_oinfo *loi,
2790 struct osc_async_page *oap)
2792 struct osc_cache_waiter ocw;
2793 struct l_wait_info lwi = { 0 };
2797 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2798 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2799 cli->cl_dirty_max, obd_max_dirty_pages,
2800 cli->cl_lost_grant, cli->cl_avail_grant);
2802 /* force the caller to try sync io. this can jump the list
2803 * of queued writes and create a discontiguous rpc stream */
2804 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2805 loi->loi_ar.ar_force_sync)
2808 /* Hopefully normal case - cache space and write credits available */
2809 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2810 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2811 osc_enter_cache_try(env, cli, loi, oap, 0))
2814 /* Make sure that there are write rpcs in flight to wait for. This
2815 * is a little silly as this object may not have any pending but
2816 * other objects sure might. */
2817 if (cli->cl_w_in_flight) {
2818 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2819 cfs_waitq_init(&ocw.ocw_waitq);
2823 loi_list_maint(cli, loi);
2824 osc_check_rpcs(env, cli);
2825 client_obd_list_unlock(&cli->cl_loi_list_lock);
2827 CDEBUG(D_CACHE, "sleeping for cache space\n");
2828 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2830 client_obd_list_lock(&cli->cl_loi_list_lock);
2831 if (!cfs_list_empty(&ocw.ocw_entry)) {
2832 cfs_list_del(&ocw.ocw_entry);
2842 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2843 struct lov_oinfo *loi, cfs_page_t *page,
2844 obd_off offset, const struct obd_async_page_ops *ops,
2845 void *data, void **res, int nocache,
2846 struct lustre_handle *lockh)
2848 struct osc_async_page *oap;
2853 return cfs_size_round(sizeof(*oap));
2856 oap->oap_magic = OAP_MAGIC;
2857 oap->oap_cli = &exp->exp_obd->u.cli;
2860 oap->oap_caller_ops = ops;
2861 oap->oap_caller_data = data;
2863 oap->oap_page = page;
2864 oap->oap_obj_off = offset;
2865 if (!client_is_remote(exp) &&
2866 cfs_capable(CFS_CAP_SYS_RESOURCE))
2867 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2869 LASSERT(!(offset & ~CFS_PAGE_MASK));
2871 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2872 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2873 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2874 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2876 cfs_spin_lock_init(&oap->oap_lock);
2877 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2881 struct osc_async_page *oap_from_cookie(void *cookie)
2883 struct osc_async_page *oap = cookie;
2884 if (oap->oap_magic != OAP_MAGIC)
2885 return ERR_PTR(-EINVAL);
2889 int osc_queue_async_io(const struct lu_env *env,
2890 struct obd_export *exp, struct lov_stripe_md *lsm,
2891 struct lov_oinfo *loi, void *cookie,
2892 int cmd, obd_off off, int count,
2893 obd_flag brw_flags, enum async_flags async_flags)
2895 struct client_obd *cli = &exp->exp_obd->u.cli;
2896 struct osc_async_page *oap;
2900 oap = oap_from_cookie(cookie);
2902 RETURN(PTR_ERR(oap));
2904 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2907 if (!cfs_list_empty(&oap->oap_pending_item) ||
2908 !cfs_list_empty(&oap->oap_urgent_item) ||
2909 !cfs_list_empty(&oap->oap_rpc_item))
2912 /* check if the file's owner/group is over quota */
2913 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2914 struct cl_object *obj;
2915 struct cl_attr attr; /* XXX put attr into thread info */
2916 unsigned int qid[MAXQUOTAS];
2918 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2920 cl_object_attr_lock(obj);
2921 rc = cl_object_attr_get(env, obj, &attr);
2922 cl_object_attr_unlock(obj);
2924 qid[USRQUOTA] = attr.cat_uid;
2925 qid[GRPQUOTA] = attr.cat_gid;
2927 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2934 loi = lsm->lsm_oinfo[0];
2936 client_obd_list_lock(&cli->cl_loi_list_lock);
2938 LASSERT(off + count <= CFS_PAGE_SIZE);
2940 oap->oap_page_off = off;
2941 oap->oap_count = count;
2942 oap->oap_brw_flags = brw_flags;
2943 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2944 if (libcfs_memory_pressure_get())
2945 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2946 cfs_spin_lock(&oap->oap_lock);
2947 oap->oap_async_flags = async_flags;
2948 cfs_spin_unlock(&oap->oap_lock);
2950 if (cmd & OBD_BRW_WRITE) {
2951 rc = osc_enter_cache(env, cli, loi, oap);
2953 client_obd_list_unlock(&cli->cl_loi_list_lock);
2958 osc_oap_to_pending(oap);
2959 loi_list_maint(cli, loi);
2961 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2964 osc_check_rpcs(env, cli);
2965 client_obd_list_unlock(&cli->cl_loi_list_lock);
2970 /* aka (~was & now & flag), but this is more clear :) */
2971 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2973 int osc_set_async_flags_base(struct client_obd *cli,
2974 struct lov_oinfo *loi, struct osc_async_page *oap,
2975 obd_flag async_flags)
2977 struct loi_oap_pages *lop;
2981 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
2983 if (oap->oap_cmd & OBD_BRW_WRITE) {
2984 lop = &loi->loi_write_lop;
2986 lop = &loi->loi_read_lop;
2989 if ((oap->oap_async_flags & async_flags) == async_flags)
2992 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2993 flags |= ASYNC_READY;
2995 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2996 cfs_list_empty(&oap->oap_rpc_item)) {
2997 if (oap->oap_async_flags & ASYNC_HP)
2998 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3000 cfs_list_add_tail(&oap->oap_urgent_item,
3002 flags |= ASYNC_URGENT;
3003 loi_list_maint(cli, loi);
3005 cfs_spin_lock(&oap->oap_lock);
3006 oap->oap_async_flags |= flags;
3007 cfs_spin_unlock(&oap->oap_lock);
3009 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3010 oap->oap_async_flags);
3014 int osc_teardown_async_page(struct obd_export *exp,
3015 struct lov_stripe_md *lsm,
3016 struct lov_oinfo *loi, void *cookie)
3018 struct client_obd *cli = &exp->exp_obd->u.cli;
3019 struct loi_oap_pages *lop;
3020 struct osc_async_page *oap;
3024 oap = oap_from_cookie(cookie);
3026 RETURN(PTR_ERR(oap));
3029 loi = lsm->lsm_oinfo[0];
3031 if (oap->oap_cmd & OBD_BRW_WRITE) {
3032 lop = &loi->loi_write_lop;
3034 lop = &loi->loi_read_lop;
3037 client_obd_list_lock(&cli->cl_loi_list_lock);
3039 if (!cfs_list_empty(&oap->oap_rpc_item))
3040 GOTO(out, rc = -EBUSY);
3042 osc_exit_cache(cli, oap, 0);
3043 osc_wake_cache_waiters(cli);
3045 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3046 cfs_list_del_init(&oap->oap_urgent_item);
3047 cfs_spin_lock(&oap->oap_lock);
3048 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3049 cfs_spin_unlock(&oap->oap_lock);
3051 if (!cfs_list_empty(&oap->oap_pending_item)) {
3052 cfs_list_del_init(&oap->oap_pending_item);
3053 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3055 loi_list_maint(cli, loi);
3056 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3058 client_obd_list_unlock(&cli->cl_loi_list_lock);
3062 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3063 struct ldlm_enqueue_info *einfo,
3066 void *data = einfo->ei_cbdata;
3068 LASSERT(lock != NULL);
3069 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3070 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3071 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3072 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3074 lock_res_and_lock(lock);
3075 cfs_spin_lock(&osc_ast_guard);
3076 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3077 lock->l_ast_data = data;
3078 cfs_spin_unlock(&osc_ast_guard);
3079 unlock_res_and_lock(lock);
3082 static void osc_set_data_with_check(struct lustre_handle *lockh,
3083 struct ldlm_enqueue_info *einfo,
3086 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3089 osc_set_lock_data_with_check(lock, einfo, flags);
3090 LDLM_LOCK_PUT(lock);
3092 CERROR("lockh %p, data %p - client evicted?\n",
3093 lockh, einfo->ei_cbdata);
3096 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3097 ldlm_iterator_t replace, void *data)
3099 struct ldlm_res_id res_id;
3100 struct obd_device *obd = class_exp2obd(exp);
3102 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3103 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3107 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3108 obd_enqueue_update_f upcall, void *cookie,
3111 int intent = *flags & LDLM_FL_HAS_INTENT;
3115 /* The request was created before ldlm_cli_enqueue call. */
3116 if (rc == ELDLM_LOCK_ABORTED) {
3117 struct ldlm_reply *rep;
3118 rep = req_capsule_server_get(&req->rq_pill,
3121 LASSERT(rep != NULL);
3122 if (rep->lock_policy_res1)
3123 rc = rep->lock_policy_res1;
3127 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3128 *flags |= LDLM_FL_LVB_READY;
3129 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3130 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3133 /* Call the update callback. */
3134 rc = (*upcall)(cookie, rc);
3138 static int osc_enqueue_interpret(const struct lu_env *env,
3139 struct ptlrpc_request *req,
3140 struct osc_enqueue_args *aa, int rc)
3142 struct ldlm_lock *lock;
3143 struct lustre_handle handle;
3146 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3147 * might be freed anytime after lock upcall has been called. */
3148 lustre_handle_copy(&handle, aa->oa_lockh);
3149 mode = aa->oa_ei->ei_mode;
3151 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3153 lock = ldlm_handle2lock(&handle);
3155 /* Take an additional reference so that a blocking AST that
3156 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3157 * to arrive after an upcall has been executed by
3158 * osc_enqueue_fini(). */
3159 ldlm_lock_addref(&handle, mode);
3161 /* Complete obtaining the lock procedure. */
3162 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3163 mode, aa->oa_flags, aa->oa_lvb,
3164 sizeof(*aa->oa_lvb), &handle, rc);
3165 /* Complete osc stuff. */
3166 rc = osc_enqueue_fini(req, aa->oa_lvb,
3167 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3169 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3171 /* Release the lock for async request. */
3172 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3174 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3175 * not already released by
3176 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3178 ldlm_lock_decref(&handle, mode);
3180 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3181 aa->oa_lockh, req, aa);
3182 ldlm_lock_decref(&handle, mode);
3183 LDLM_LOCK_PUT(lock);
3187 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3188 struct lov_oinfo *loi, int flags,
3189 struct ost_lvb *lvb, __u32 mode, int rc)
3191 if (rc == ELDLM_OK) {
3192 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3195 LASSERT(lock != NULL);
3196 loi->loi_lvb = *lvb;
3197 tmp = loi->loi_lvb.lvb_size;
3198 /* Extend KMS up to the end of this lock and no further
3199 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3200 if (tmp > lock->l_policy_data.l_extent.end)
3201 tmp = lock->l_policy_data.l_extent.end + 1;
3202 if (tmp >= loi->loi_kms) {
3203 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3204 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3205 loi_kms_set(loi, tmp);
3207 LDLM_DEBUG(lock, "lock acquired, setting rss="
3208 LPU64"; leaving kms="LPU64", end="LPU64,
3209 loi->loi_lvb.lvb_size, loi->loi_kms,
3210 lock->l_policy_data.l_extent.end);
3212 ldlm_lock_allow_match(lock);
3213 LDLM_LOCK_PUT(lock);
3214 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3215 loi->loi_lvb = *lvb;
3216 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3217 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3221 EXPORT_SYMBOL(osc_update_enqueue);
3223 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3225 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3226 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3227 * other synchronous requests, however keeping some locks and trying to obtain
3228 * others may take a considerable amount of time in a case of ost failure; and
3229 * when other sync requests do not get released lock from a client, the client
3230 * is excluded from the cluster -- such scenarious make the life difficult, so
3231 * release locks just after they are obtained. */
3232 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3233 int *flags, ldlm_policy_data_t *policy,
3234 struct ost_lvb *lvb, int kms_valid,
3235 obd_enqueue_update_f upcall, void *cookie,
3236 struct ldlm_enqueue_info *einfo,
3237 struct lustre_handle *lockh,
3238 struct ptlrpc_request_set *rqset, int async)
3240 struct obd_device *obd = exp->exp_obd;
3241 struct ptlrpc_request *req = NULL;
3242 int intent = *flags & LDLM_FL_HAS_INTENT;
3247 /* Filesystem lock extents are extended to page boundaries so that
3248 * dealing with the page cache is a little smoother. */
3249 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3250 policy->l_extent.end |= ~CFS_PAGE_MASK;
3253 * kms is not valid when either object is completely fresh (so that no
3254 * locks are cached), or object was evicted. In the latter case cached
3255 * lock cannot be used, because it would prime inode state with
3256 * potentially stale LVB.
3261 /* Next, search for already existing extent locks that will cover us */
3262 /* If we're trying to read, we also search for an existing PW lock. The
3263 * VFS and page cache already protect us locally, so lots of readers/
3264 * writers can share a single PW lock.
3266 * There are problems with conversion deadlocks, so instead of
3267 * converting a read lock to a write lock, we'll just enqueue a new
3270 * At some point we should cancel the read lock instead of making them
3271 * send us a blocking callback, but there are problems with canceling
3272 * locks out from other users right now, too. */
3273 mode = einfo->ei_mode;
3274 if (einfo->ei_mode == LCK_PR)
3276 mode = ldlm_lock_match(obd->obd_namespace,
3277 *flags | LDLM_FL_LVB_READY, res_id,
3278 einfo->ei_type, policy, mode, lockh, 0);
3280 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3282 if (matched->l_ast_data == NULL ||
3283 matched->l_ast_data == einfo->ei_cbdata) {
3284 /* addref the lock only if not async requests and PW
3285 * lock is matched whereas we asked for PR. */
3286 if (!rqset && einfo->ei_mode != mode)
3287 ldlm_lock_addref(lockh, LCK_PR);
3288 osc_set_lock_data_with_check(matched, einfo, *flags);
3290 /* I would like to be able to ASSERT here that
3291 * rss <= kms, but I can't, for reasons which
3292 * are explained in lov_enqueue() */
3295 /* We already have a lock, and it's referenced */
3296 (*upcall)(cookie, ELDLM_OK);
3298 /* For async requests, decref the lock. */
3299 if (einfo->ei_mode != mode)
3300 ldlm_lock_decref(lockh, LCK_PW);
3302 ldlm_lock_decref(lockh, einfo->ei_mode);
3303 LDLM_LOCK_PUT(matched);
3306 ldlm_lock_decref(lockh, mode);
3307 LDLM_LOCK_PUT(matched);
3312 CFS_LIST_HEAD(cancels);
3313 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3314 &RQF_LDLM_ENQUEUE_LVB);
3318 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3322 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3324 ptlrpc_request_set_replen(req);
3327 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3328 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3330 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3331 sizeof(*lvb), lockh, async);
3334 struct osc_enqueue_args *aa;
3335 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3336 aa = ptlrpc_req_async_args(req);
3339 aa->oa_flags = flags;
3340 aa->oa_upcall = upcall;
3341 aa->oa_cookie = cookie;
3343 aa->oa_lockh = lockh;
3345 req->rq_interpret_reply =
3346 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3347 if (rqset == PTLRPCD_SET)
3348 ptlrpcd_add_req(req, PSCOPE_OTHER);
3350 ptlrpc_set_add_req(rqset, req);
3351 } else if (intent) {
3352 ptlrpc_req_finished(req);
3357 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3359 ptlrpc_req_finished(req);
3364 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3365 struct ldlm_enqueue_info *einfo,
3366 struct ptlrpc_request_set *rqset)
3368 struct ldlm_res_id res_id;
3372 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3373 oinfo->oi_md->lsm_object_gr, &res_id);
3375 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3376 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3377 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3378 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3379 rqset, rqset != NULL);
3383 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3384 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3385 int *flags, void *data, struct lustre_handle *lockh,
3388 struct obd_device *obd = exp->exp_obd;
3389 int lflags = *flags;
3393 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3396 /* Filesystem lock extents are extended to page boundaries so that
3397 * dealing with the page cache is a little smoother */
3398 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3399 policy->l_extent.end |= ~CFS_PAGE_MASK;
3401 /* Next, search for already existing extent locks that will cover us */
3402 /* If we're trying to read, we also search for an existing PW lock. The
3403 * VFS and page cache already protect us locally, so lots of readers/
3404 * writers can share a single PW lock. */
3408 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3409 res_id, type, policy, rc, lockh, unref);
3412 osc_set_data_with_check(lockh, data, lflags);
3413 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3414 ldlm_lock_addref(lockh, LCK_PR);
3415 ldlm_lock_decref(lockh, LCK_PW);
3422 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3426 if (unlikely(mode == LCK_GROUP))
3427 ldlm_lock_decref_and_cancel(lockh, mode);
3429 ldlm_lock_decref(lockh, mode);
3434 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3435 __u32 mode, struct lustre_handle *lockh)
3438 RETURN(osc_cancel_base(lockh, mode));
3441 static int osc_cancel_unused(struct obd_export *exp,
3442 struct lov_stripe_md *lsm, int flags,
3445 struct obd_device *obd = class_exp2obd(exp);
3446 struct ldlm_res_id res_id, *resp = NULL;
3449 resp = osc_build_res_name(lsm->lsm_object_id,
3450 lsm->lsm_object_gr, &res_id);
3453 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3456 static int osc_statfs_interpret(const struct lu_env *env,
3457 struct ptlrpc_request *req,
3458 struct osc_async_args *aa, int rc)
3460 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3461 struct obd_statfs *msfs;
3466 /* The request has in fact never been sent
3467 * due to issues at a higher level (LOV).
3468 * Exit immediately since the caller is
3469 * aware of the problem and takes care
3470 * of the clean up */
3473 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3474 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3480 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3482 GOTO(out, rc = -EPROTO);
3485 /* Reinitialize the RDONLY and DEGRADED flags at the client
3486 * on each statfs, so they don't stay set permanently. */
3487 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3489 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3490 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3491 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3492 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3494 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3495 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3496 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3497 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3499 /* Add a bit of hysteresis so this flag isn't continually flapping,
3500 * and ensure that new files don't get extremely fragmented due to
3501 * only a small amount of available space in the filesystem.
3502 * We want to set the NOSPC flag when there is less than ~0.1% free
3503 * and clear it when there is at least ~0.2% free space, so:
3504 * avail < ~0.1% max max = avail + used
3505 * 1025 * avail < avail + used used = blocks - free
3506 * 1024 * avail < used
3507 * 1024 * avail < blocks - free
3508 * avail < ((blocks - free) >> 10)
3510 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3511 * lose that amount of space so in those cases we report no space left
3512 * if their is less than 1 GB left. */
3513 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3514 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3515 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3516 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3517 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3518 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3519 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3521 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3523 *aa->aa_oi->oi_osfs = *msfs;
3525 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3529 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3530 __u64 max_age, struct ptlrpc_request_set *rqset)
3532 struct ptlrpc_request *req;
3533 struct osc_async_args *aa;
3537 /* We could possibly pass max_age in the request (as an absolute
3538 * timestamp or a "seconds.usec ago") so the target can avoid doing
3539 * extra calls into the filesystem if that isn't necessary (e.g.
3540 * during mount that would help a bit). Having relative timestamps
3541 * is not so great if request processing is slow, while absolute
3542 * timestamps are not ideal because they need time synchronization. */
3543 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3547 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3549 ptlrpc_request_free(req);
3552 ptlrpc_request_set_replen(req);
3553 req->rq_request_portal = OST_CREATE_PORTAL;
3554 ptlrpc_at_set_req_timeout(req);
3556 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3557 /* procfs requests not want stat in wait for avoid deadlock */
3558 req->rq_no_resend = 1;
3559 req->rq_no_delay = 1;
3562 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3563 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3564 aa = ptlrpc_req_async_args(req);
3567 ptlrpc_set_add_req(rqset, req);
3571 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3572 __u64 max_age, __u32 flags)
3574 struct obd_statfs *msfs;
3575 struct ptlrpc_request *req;
3576 struct obd_import *imp = NULL;
3580 /*Since the request might also come from lprocfs, so we need
3581 *sync this with client_disconnect_export Bug15684*/
3582 cfs_down_read(&obd->u.cli.cl_sem);
3583 if (obd->u.cli.cl_import)
3584 imp = class_import_get(obd->u.cli.cl_import);
3585 cfs_up_read(&obd->u.cli.cl_sem);
3589 /* We could possibly pass max_age in the request (as an absolute
3590 * timestamp or a "seconds.usec ago") so the target can avoid doing
3591 * extra calls into the filesystem if that isn't necessary (e.g.
3592 * during mount that would help a bit). Having relative timestamps
3593 * is not so great if request processing is slow, while absolute
3594 * timestamps are not ideal because they need time synchronization. */
3595 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3597 class_import_put(imp);
3602 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3604 ptlrpc_request_free(req);
3607 ptlrpc_request_set_replen(req);
3608 req->rq_request_portal = OST_CREATE_PORTAL;
3609 ptlrpc_at_set_req_timeout(req);
3611 if (flags & OBD_STATFS_NODELAY) {
3612 /* procfs requests not want stat in wait for avoid deadlock */
3613 req->rq_no_resend = 1;
3614 req->rq_no_delay = 1;
3617 rc = ptlrpc_queue_wait(req);
3621 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3623 GOTO(out, rc = -EPROTO);
3630 ptlrpc_req_finished(req);
3634 /* Retrieve object striping information.
3636 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3637 * the maximum number of OST indices which will fit in the user buffer.
3638 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3640 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3642 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3643 struct lov_user_md_v3 lum, *lumk;
3644 struct lov_user_ost_data_v1 *lmm_objects;
3645 int rc = 0, lum_size;
3651 /* we only need the header part from user space to get lmm_magic and
3652 * lmm_stripe_count, (the header part is common to v1 and v3) */
3653 lum_size = sizeof(struct lov_user_md_v1);
3654 if (cfs_copy_from_user(&lum, lump, lum_size))
3657 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3658 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3661 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3662 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3663 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3664 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3666 /* we can use lov_mds_md_size() to compute lum_size
3667 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3668 if (lum.lmm_stripe_count > 0) {
3669 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3670 OBD_ALLOC(lumk, lum_size);
3674 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3675 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3677 lmm_objects = &(lumk->lmm_objects[0]);
3678 lmm_objects->l_object_id = lsm->lsm_object_id;
3680 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3684 lumk->lmm_object_id = lsm->lsm_object_id;
3685 lumk->lmm_object_gr = lsm->lsm_object_gr;
3686 lumk->lmm_stripe_count = 1;
3688 if (cfs_copy_to_user(lump, lumk, lum_size))
3692 OBD_FREE(lumk, lum_size);
3698 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3699 void *karg, void *uarg)
3701 struct obd_device *obd = exp->exp_obd;
3702 struct obd_ioctl_data *data = karg;
3706 if (!cfs_try_module_get(THIS_MODULE)) {
3707 CERROR("Can't get module. Is it alive?");
3711 case OBD_IOC_LOV_GET_CONFIG: {
3713 struct lov_desc *desc;
3714 struct obd_uuid uuid;
3718 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3719 GOTO(out, err = -EINVAL);
3721 data = (struct obd_ioctl_data *)buf;
3723 if (sizeof(*desc) > data->ioc_inllen1) {
3724 obd_ioctl_freedata(buf, len);
3725 GOTO(out, err = -EINVAL);
3728 if (data->ioc_inllen2 < sizeof(uuid)) {
3729 obd_ioctl_freedata(buf, len);
3730 GOTO(out, err = -EINVAL);
3733 desc = (struct lov_desc *)data->ioc_inlbuf1;
3734 desc->ld_tgt_count = 1;
3735 desc->ld_active_tgt_count = 1;
3736 desc->ld_default_stripe_count = 1;
3737 desc->ld_default_stripe_size = 0;
3738 desc->ld_default_stripe_offset = 0;
3739 desc->ld_pattern = 0;
3740 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3742 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3744 err = cfs_copy_to_user((void *)uarg, buf, len);
3747 obd_ioctl_freedata(buf, len);
3750 case LL_IOC_LOV_SETSTRIPE:
3751 err = obd_alloc_memmd(exp, karg);
3755 case LL_IOC_LOV_GETSTRIPE:
3756 err = osc_getstripe(karg, uarg);
3758 case OBD_IOC_CLIENT_RECOVER:
3759 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3764 case IOC_OSC_SET_ACTIVE:
3765 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3768 case OBD_IOC_POLL_QUOTACHECK:
3769 err = lquota_poll_check(quota_interface, exp,
3770 (struct if_quotacheck *)karg);
3772 case OBD_IOC_PING_TARGET:
3773 err = ptlrpc_obd_ping(obd);
3776 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3777 cmd, cfs_curproc_comm());
3778 GOTO(out, err = -ENOTTY);
3781 cfs_module_put(THIS_MODULE);
3785 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3786 void *key, __u32 *vallen, void *val,
3787 struct lov_stripe_md *lsm)
3790 if (!vallen || !val)
3793 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3794 __u32 *stripe = val;
3795 *vallen = sizeof(*stripe);
3798 } else if (KEY_IS(KEY_LAST_ID)) {
3799 struct ptlrpc_request *req;
3804 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3805 &RQF_OST_GET_INFO_LAST_ID);
3809 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3810 RCL_CLIENT, keylen);
3811 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3813 ptlrpc_request_free(req);
3817 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3818 memcpy(tmp, key, keylen);
3820 req->rq_no_delay = req->rq_no_resend = 1;
3821 ptlrpc_request_set_replen(req);
3822 rc = ptlrpc_queue_wait(req);
3826 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3828 GOTO(out, rc = -EPROTO);
3830 *((obd_id *)val) = *reply;
3832 ptlrpc_req_finished(req);
3834 } else if (KEY_IS(KEY_FIEMAP)) {
3835 struct ptlrpc_request *req;
3836 struct ll_user_fiemap *reply;
3840 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3841 &RQF_OST_GET_INFO_FIEMAP);
3845 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3846 RCL_CLIENT, keylen);
3847 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3848 RCL_CLIENT, *vallen);
3849 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3850 RCL_SERVER, *vallen);
3852 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3854 ptlrpc_request_free(req);
3858 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3859 memcpy(tmp, key, keylen);
3860 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3861 memcpy(tmp, val, *vallen);
3863 ptlrpc_request_set_replen(req);
3864 rc = ptlrpc_queue_wait(req);
3868 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3870 GOTO(out1, rc = -EPROTO);
3872 memcpy(val, reply, *vallen);
3874 ptlrpc_req_finished(req);
3882 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3884 struct llog_ctxt *ctxt;
3888 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3890 rc = llog_initiator_connect(ctxt);
3891 llog_ctxt_put(ctxt);
3893 /* XXX return an error? skip setting below flags? */
3896 cfs_spin_lock(&imp->imp_lock);
3897 imp->imp_server_timeout = 1;
3898 imp->imp_pingable = 1;
3899 cfs_spin_unlock(&imp->imp_lock);
3900 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3905 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3906 struct ptlrpc_request *req,
3913 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3916 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3917 void *key, obd_count vallen, void *val,
3918 struct ptlrpc_request_set *set)
3920 struct ptlrpc_request *req;
3921 struct obd_device *obd = exp->exp_obd;
3922 struct obd_import *imp = class_exp2cliimp(exp);
3927 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3929 if (KEY_IS(KEY_NEXT_ID)) {
3931 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3933 if (vallen != sizeof(obd_id))
3938 if (vallen != sizeof(obd_id))
3941 /* avoid race between allocate new object and set next id
3942 * from ll_sync thread */
3943 cfs_spin_lock(&oscc->oscc_lock);
3944 new_val = *((obd_id*)val) + 1;
3945 if (new_val > oscc->oscc_next_id)
3946 oscc->oscc_next_id = new_val;
3947 cfs_spin_unlock(&oscc->oscc_lock);
3948 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3949 exp->exp_obd->obd_name,
3950 obd->u.cli.cl_oscc.oscc_next_id);
3955 if (KEY_IS(KEY_INIT_RECOV)) {
3956 if (vallen != sizeof(int))
3958 cfs_spin_lock(&imp->imp_lock);
3959 imp->imp_initial_recov = *(int *)val;
3960 cfs_spin_unlock(&imp->imp_lock);
3961 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3962 exp->exp_obd->obd_name,
3963 imp->imp_initial_recov);
3967 if (KEY_IS(KEY_CHECKSUM)) {
3968 if (vallen != sizeof(int))
3970 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3974 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3975 sptlrpc_conf_client_adapt(obd);
3979 if (KEY_IS(KEY_FLUSH_CTX)) {
3980 sptlrpc_import_flush_my_ctx(imp);
3984 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3987 /* We pass all other commands directly to OST. Since nobody calls osc
3988 methods directly and everybody is supposed to go through LOV, we
3989 assume lov checked invalid values for us.
3990 The only recognised values so far are evict_by_nid and mds_conn.
3991 Even if something bad goes through, we'd get a -EINVAL from OST
3994 if (KEY_IS(KEY_GRANT_SHRINK))
3995 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3997 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4002 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4003 RCL_CLIENT, keylen);
4004 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4005 RCL_CLIENT, vallen);
4006 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4008 ptlrpc_request_free(req);
4012 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4013 memcpy(tmp, key, keylen);
4014 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4015 memcpy(tmp, val, vallen);
4017 if (KEY_IS(KEY_MDS_CONN)) {
4018 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4020 oscc->oscc_oa.o_gr = (*(__u32 *)val);
4021 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4022 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
4023 req->rq_no_delay = req->rq_no_resend = 1;
4024 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4025 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4026 struct osc_grant_args *aa;
4029 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4030 aa = ptlrpc_req_async_args(req);
4033 ptlrpc_req_finished(req);
4036 *oa = ((struct ost_body *)val)->oa;
4038 req->rq_interpret_reply = osc_shrink_grant_interpret;
4041 ptlrpc_request_set_replen(req);
4042 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4043 LASSERT(set != NULL);
4044 ptlrpc_set_add_req(set, req);
4045 ptlrpc_check_set(NULL, set);
4047 ptlrpcd_add_req(req, PSCOPE_OTHER);
4053 static struct llog_operations osc_size_repl_logops = {
4054 lop_cancel: llog_obd_repl_cancel
4057 static struct llog_operations osc_mds_ost_orig_logops;
4059 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4060 struct obd_device *tgt, struct llog_catid *catid)
4065 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4066 &catid->lci_logid, &osc_mds_ost_orig_logops);
4068 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4072 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4073 NULL, &osc_size_repl_logops);
4075 struct llog_ctxt *ctxt =
4076 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4079 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4084 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4085 obd->obd_name, tgt->obd_name, catid, rc);
4086 CERROR("logid "LPX64":0x%x\n",
4087 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4092 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4093 struct obd_device *disk_obd, int *index)
4095 struct llog_catid catid;
4096 static char name[32] = CATLIST;
4100 LASSERT(olg == &obd->obd_olg);
4102 cfs_mutex_down(&olg->olg_cat_processing);
4103 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4105 CERROR("rc: %d\n", rc);
4109 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4110 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4111 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4113 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4115 CERROR("rc: %d\n", rc);
4119 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4121 CERROR("rc: %d\n", rc);
4126 cfs_mutex_up(&olg->olg_cat_processing);
4131 static int osc_llog_finish(struct obd_device *obd, int count)
4133 struct llog_ctxt *ctxt;
4134 int rc = 0, rc2 = 0;
4137 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4139 rc = llog_cleanup(ctxt);
4141 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4143 rc2 = llog_cleanup(ctxt);
4150 static int osc_reconnect(const struct lu_env *env,
4151 struct obd_export *exp, struct obd_device *obd,
4152 struct obd_uuid *cluuid,
4153 struct obd_connect_data *data,
4156 struct client_obd *cli = &obd->u.cli;
4158 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4161 client_obd_list_lock(&cli->cl_loi_list_lock);
4162 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4163 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4164 lost_grant = cli->cl_lost_grant;
4165 cli->cl_lost_grant = 0;
4166 client_obd_list_unlock(&cli->cl_loi_list_lock);
4168 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4169 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4170 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4171 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4172 " ocd_grant: %d\n", data->ocd_connect_flags,
4173 data->ocd_version, data->ocd_grant);
4179 static int osc_disconnect(struct obd_export *exp)
4181 struct obd_device *obd = class_exp2obd(exp);
4182 struct llog_ctxt *ctxt;
4185 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4187 if (obd->u.cli.cl_conn_count == 1) {
4188 /* Flush any remaining cancel messages out to the
4190 llog_sync(ctxt, exp);
4192 llog_ctxt_put(ctxt);
4194 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4198 rc = client_disconnect_export(exp);
4200 * Initially we put del_shrink_grant before disconnect_export, but it
4201 * causes the following problem if setup (connect) and cleanup
4202 * (disconnect) are tangled together.
4203 * connect p1 disconnect p2
4204 * ptlrpc_connect_import
4205 * ............... class_manual_cleanup
4208 * ptlrpc_connect_interrupt
4210 * add this client to shrink list
4212 * Bang! pinger trigger the shrink.
4213 * So the osc should be disconnected from the shrink list, after we
4214 * are sure the import has been destroyed. BUG18662
4216 if (obd->u.cli.cl_import == NULL)
4217 osc_del_shrink_grant(&obd->u.cli);
4221 static int osc_import_event(struct obd_device *obd,
4222 struct obd_import *imp,
4223 enum obd_import_event event)
4225 struct client_obd *cli;
4229 LASSERT(imp->imp_obd == obd);
4232 case IMP_EVENT_DISCON: {
4233 /* Only do this on the MDS OSC's */
4234 if (imp->imp_server_timeout) {
4235 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4237 cfs_spin_lock(&oscc->oscc_lock);
4238 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4239 cfs_spin_unlock(&oscc->oscc_lock);
4242 client_obd_list_lock(&cli->cl_loi_list_lock);
4243 cli->cl_avail_grant = 0;
4244 cli->cl_lost_grant = 0;
4245 client_obd_list_unlock(&cli->cl_loi_list_lock);
4248 case IMP_EVENT_INACTIVE: {
4249 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4252 case IMP_EVENT_INVALIDATE: {
4253 struct ldlm_namespace *ns = obd->obd_namespace;
4257 env = cl_env_get(&refcheck);
4261 client_obd_list_lock(&cli->cl_loi_list_lock);
4262 /* all pages go to failing rpcs due to the invalid
4264 osc_check_rpcs(env, cli);
4265 client_obd_list_unlock(&cli->cl_loi_list_lock);
4267 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4268 cl_env_put(env, &refcheck);
4273 case IMP_EVENT_ACTIVE: {
4274 /* Only do this on the MDS OSC's */
4275 if (imp->imp_server_timeout) {
4276 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4278 cfs_spin_lock(&oscc->oscc_lock);
4279 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4280 cfs_spin_unlock(&oscc->oscc_lock);
4282 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4285 case IMP_EVENT_OCD: {
4286 struct obd_connect_data *ocd = &imp->imp_connect_data;
4288 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4289 osc_init_grant(&obd->u.cli, ocd);
4292 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4293 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4295 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4299 CERROR("Unknown import event %d\n", event);
4305 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4311 rc = ptlrpcd_addref();
4315 rc = client_obd_setup(obd, lcfg);
4319 struct lprocfs_static_vars lvars = { 0 };
4320 struct client_obd *cli = &obd->u.cli;
4322 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4323 lprocfs_osc_init_vars(&lvars);
4324 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4325 lproc_osc_attach_seqstat(obd);
4326 sptlrpc_lprocfs_cliobd_attach(obd);
4327 ptlrpc_lprocfs_register_obd(obd);
4331 /* We need to allocate a few requests more, because
4332 brw_interpret tries to create new requests before freeing
4333 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4334 reserved, but I afraid that might be too much wasted RAM
4335 in fact, so 2 is just my guess and still should work. */
4336 cli->cl_import->imp_rq_pool =
4337 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4339 ptlrpc_add_rqs_to_pool);
4341 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4342 cfs_sema_init(&cli->cl_grant_sem, 1);
4348 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4354 case OBD_CLEANUP_EARLY: {
4355 struct obd_import *imp;
4356 imp = obd->u.cli.cl_import;
4357 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4358 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4359 ptlrpc_deactivate_import(imp);
4360 cfs_spin_lock(&imp->imp_lock);
4361 imp->imp_pingable = 0;
4362 cfs_spin_unlock(&imp->imp_lock);
4365 case OBD_CLEANUP_EXPORTS: {
4366 /* If we set up but never connected, the
4367 client import will not have been cleaned. */
4368 if (obd->u.cli.cl_import) {
4369 struct obd_import *imp;
4370 cfs_down_write(&obd->u.cli.cl_sem);
4371 imp = obd->u.cli.cl_import;
4372 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4374 ptlrpc_invalidate_import(imp);
4375 if (imp->imp_rq_pool) {
4376 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4377 imp->imp_rq_pool = NULL;
4379 class_destroy_import(imp);
4380 cfs_up_write(&obd->u.cli.cl_sem);
4381 obd->u.cli.cl_import = NULL;
4383 rc = obd_llog_finish(obd, 0);
4385 CERROR("failed to cleanup llogging subsystems\n");
4392 int osc_cleanup(struct obd_device *obd)
4397 ptlrpc_lprocfs_unregister_obd(obd);
4398 lprocfs_obd_cleanup(obd);
4400 /* free memory of osc quota cache */
4401 lquota_cleanup(quota_interface, obd);
4403 rc = client_obd_cleanup(obd);
4409 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4411 struct lprocfs_static_vars lvars = { 0 };
4414 lprocfs_osc_init_vars(&lvars);
4416 switch (lcfg->lcfg_command) {
4418 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4428 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4430 return osc_process_config_base(obd, buf);
4433 struct obd_ops osc_obd_ops = {
4434 .o_owner = THIS_MODULE,
4435 .o_setup = osc_setup,
4436 .o_precleanup = osc_precleanup,
4437 .o_cleanup = osc_cleanup,
4438 .o_add_conn = client_import_add_conn,
4439 .o_del_conn = client_import_del_conn,
4440 .o_connect = client_connect_import,
4441 .o_reconnect = osc_reconnect,
4442 .o_disconnect = osc_disconnect,
4443 .o_statfs = osc_statfs,
4444 .o_statfs_async = osc_statfs_async,
4445 .o_packmd = osc_packmd,
4446 .o_unpackmd = osc_unpackmd,
4447 .o_precreate = osc_precreate,
4448 .o_create = osc_create,
4449 .o_create_async = osc_create_async,
4450 .o_destroy = osc_destroy,
4451 .o_getattr = osc_getattr,
4452 .o_getattr_async = osc_getattr_async,
4453 .o_setattr = osc_setattr,
4454 .o_setattr_async = osc_setattr_async,
4456 .o_punch = osc_punch,
4458 .o_enqueue = osc_enqueue,
4459 .o_change_cbdata = osc_change_cbdata,
4460 .o_cancel = osc_cancel,
4461 .o_cancel_unused = osc_cancel_unused,
4462 .o_iocontrol = osc_iocontrol,
4463 .o_get_info = osc_get_info,
4464 .o_set_info_async = osc_set_info_async,
4465 .o_import_event = osc_import_event,
4466 .o_llog_init = osc_llog_init,
4467 .o_llog_finish = osc_llog_finish,
4468 .o_process_config = osc_process_config,
4471 extern struct lu_kmem_descr osc_caches[];
4472 extern cfs_spinlock_t osc_ast_guard;
4473 extern cfs_lock_class_key_t osc_ast_guard_class;
4475 int __init osc_init(void)
4477 struct lprocfs_static_vars lvars = { 0 };
4481 /* print an address of _any_ initialized kernel symbol from this
4482 * module, to allow debugging with gdb that doesn't support data
4483 * symbols from modules.*/
4484 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4486 rc = lu_kmem_init(osc_caches);
4488 lprocfs_osc_init_vars(&lvars);
4490 cfs_request_module("lquota");
4491 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4492 lquota_init(quota_interface);
4493 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4495 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4496 LUSTRE_OSC_NAME, &osc_device_type);
4498 if (quota_interface)
4499 PORTAL_SYMBOL_PUT(osc_quota_interface);
4500 lu_kmem_fini(osc_caches);
4504 cfs_spin_lock_init(&osc_ast_guard);
4505 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4507 osc_mds_ost_orig_logops = llog_lvfs_ops;
4508 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4509 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4510 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4511 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4517 static void /*__exit*/ osc_exit(void)
4519 lu_device_type_fini(&osc_device_type);
4521 lquota_exit(quota_interface);
4522 if (quota_interface)
4523 PORTAL_SYMBOL_PUT(osc_quota_interface);
4525 class_unregister_type(LUSTRE_OSC_NAME);
4526 lu_kmem_fini(osc_caches);
4529 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4530 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4531 MODULE_LICENSE("GPL");
4533 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);