1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_async_args *aa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
363 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 struct ptlrpc_request_set *rqset)
371 struct ptlrpc_request *req;
372 struct osc_async_args *aa;
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
380 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383 ptlrpc_request_free(req);
387 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390 osc_pack_req_body(req, oinfo);
392 ptlrpc_request_set_replen(req);
394 /* do mds to ost setattr asynchronously */
396 /* Do not wait for response. */
397 ptlrpcd_add_req(req, PSCOPE_OTHER);
399 req->rq_interpret_reply =
400 (ptlrpc_interpterer_t)osc_setattr_interpret;
402 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
403 aa = ptlrpc_req_async_args(req);
406 ptlrpc_set_add_req(rqset, req);
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413 struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 struct ptlrpc_request *req;
416 struct ost_body *body;
417 struct lov_stripe_md *lsm;
426 rc = obd_alloc_memmd(exp, &lsm);
431 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433 GOTO(out, rc = -ENOMEM);
435 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437 ptlrpc_request_free(req);
441 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443 lustre_set_wire_obdo(&body->oa, oa);
445 ptlrpc_request_set_replen(req);
447 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448 oa->o_flags == OBD_FL_DELORPHAN) {
450 "delorphan from OST integration");
451 /* Don't resend the delorphan req */
452 req->rq_no_resend = req->rq_no_delay = 1;
455 rc = ptlrpc_queue_wait(req);
459 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461 GOTO(out_req, rc = -EPROTO);
463 lustre_get_wire_obdo(oa, &body->oa);
465 /* This should really be sent by the OST */
466 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
467 oa->o_valid |= OBD_MD_FLBLKSZ;
469 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470 * have valid lsm_oinfo data structs, so don't go touching that.
471 * This needs to be fixed in a big way.
473 lsm->lsm_object_id = oa->o_id;
474 lsm->lsm_object_gr = oa->o_gr;
478 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
480 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481 if (!oti->oti_logcookies)
482 oti_alloc_cookies(oti, 1);
483 *oti->oti_logcookies = oa->o_lcookie;
487 CDEBUG(D_HA, "transno: "LPD64"\n",
488 lustre_msg_get_transno(req->rq_repmsg));
490 ptlrpc_req_finished(req);
493 obd_free_memmd(exp, &lsm);
497 static int osc_punch_interpret(const struct lu_env *env,
498 struct ptlrpc_request *req,
499 struct osc_punch_args *aa, int rc)
501 struct ost_body *body;
507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509 GOTO(out, rc = -EPROTO);
511 lustre_get_wire_obdo(aa->pa_oa, &body->oa);
513 rc = aa->pa_upcall(aa->pa_cookie, rc);
517 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
518 struct obd_capa *capa,
519 obd_enqueue_update_f upcall, void *cookie,
520 struct ptlrpc_request_set *rqset)
522 struct ptlrpc_request *req;
523 struct osc_punch_args *aa;
524 struct ost_body *body;
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 osc_set_capa_size(req, &RMF_CAPA1, capa);
533 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535 ptlrpc_request_free(req);
538 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req);
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 lustre_set_wire_obdo(&body->oa, oa);
544 osc_pack_capa(req, body, capa);
546 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
550 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551 aa = ptlrpc_req_async_args(req);
553 aa->pa_upcall = upcall;
554 aa->pa_cookie = cookie;
555 if (rqset == PTLRPCD_SET)
556 ptlrpcd_add_req(req, PSCOPE_OTHER);
558 ptlrpc_set_add_req(rqset, req);
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564 struct obd_trans_info *oti,
565 struct ptlrpc_request_set *rqset)
567 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
568 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
571 oinfo->oi_cb_up, oinfo, rqset);
574 static int osc_sync(struct obd_export *exp, struct obdo *oa,
575 struct lov_stripe_md *md, obd_size start, obd_size end,
578 struct ptlrpc_request *req;
579 struct ost_body *body;
584 CDEBUG(D_INFO, "oa NULL\n");
588 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
592 osc_set_capa_size(req, &RMF_CAPA1, capa);
593 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
595 ptlrpc_request_free(req);
599 /* overload the size and blocks fields in the oa with start/end */
600 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
602 lustre_set_wire_obdo(&body->oa, oa);
603 body->oa.o_size = start;
604 body->oa.o_blocks = end;
605 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
606 osc_pack_capa(req, body, capa);
608 ptlrpc_request_set_replen(req);
610 rc = ptlrpc_queue_wait(req);
614 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
616 GOTO(out, rc = -EPROTO);
618 lustre_get_wire_obdo(oa, &body->oa);
622 ptlrpc_req_finished(req);
626 /* Find and cancel locally locks matched by @mode in the resource found by
627 * @objid. Found locks are added into @cancel list. Returns the amount of
628 * locks added to @cancels list. */
629 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
630 struct list_head *cancels, ldlm_mode_t mode,
633 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
634 struct ldlm_res_id res_id;
635 struct ldlm_resource *res;
639 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
640 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
644 LDLM_RESOURCE_ADDREF(res);
645 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
646 lock_flags, 0, NULL);
647 LDLM_RESOURCE_DELREF(res);
648 ldlm_resource_putref(res);
652 static int osc_destroy_interpret(const struct lu_env *env,
653 struct ptlrpc_request *req, void *data,
656 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
658 atomic_dec(&cli->cl_destroy_in_flight);
659 cfs_waitq_signal(&cli->cl_destroy_waitq);
663 static int osc_can_send_destroy(struct client_obd *cli)
665 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
666 cli->cl_max_rpcs_in_flight) {
667 /* The destroy request can be sent */
670 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
671 cli->cl_max_rpcs_in_flight) {
673 * The counter has been modified between the two atomic
676 cfs_waitq_signal(&cli->cl_destroy_waitq);
681 /* Destroy requests can be async always on the client, and we don't even really
682 * care about the return code since the client cannot do anything at all about
684 * When the MDS is unlinking a filename, it saves the file objects into a
685 * recovery llog, and these object records are cancelled when the OST reports
686 * they were destroyed and sync'd to disk (i.e. transaction committed).
687 * If the client dies, or the OST is down when the object should be destroyed,
688 * the records are not cancelled, and when the OST reconnects to the MDS next,
689 * it will retrieve the llog unlink logs and then sends the log cancellation
690 * cookies to the MDS after committing destroy transactions. */
691 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
692 struct lov_stripe_md *ea, struct obd_trans_info *oti,
693 struct obd_export *md_export, void *capa)
695 struct client_obd *cli = &exp->exp_obd->u.cli;
696 struct ptlrpc_request *req;
697 struct ost_body *body;
698 CFS_LIST_HEAD(cancels);
703 CDEBUG(D_INFO, "oa NULL\n");
707 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
708 LDLM_FL_DISCARD_DATA);
710 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
712 ldlm_lock_list_put(&cancels, l_bl_ast, count);
716 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
717 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
720 ptlrpc_request_free(req);
724 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
725 ptlrpc_at_set_req_timeout(req);
727 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
728 oa->o_lcookie = *oti->oti_logcookies;
729 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
731 lustre_set_wire_obdo(&body->oa, oa);
733 osc_pack_capa(req, body, (struct obd_capa *)capa);
734 ptlrpc_request_set_replen(req);
736 /* don't throttle destroy RPCs for the MDT */
737 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
738 req->rq_interpret_reply = osc_destroy_interpret;
739 if (!osc_can_send_destroy(cli)) {
740 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
744 * Wait until the number of on-going destroy RPCs drops
745 * under max_rpc_in_flight
747 l_wait_event_exclusive(cli->cl_destroy_waitq,
748 osc_can_send_destroy(cli), &lwi);
752 /* Do not wait for response */
753 ptlrpcd_add_req(req, PSCOPE_OTHER);
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
760 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
762 LASSERT(!(oa->o_valid & bits));
765 client_obd_list_lock(&cli->cl_loi_list_lock);
766 oa->o_dirty = cli->cl_dirty;
767 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
771 } else if (atomic_read(&obd_dirty_pages) -
772 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
773 CERROR("dirty %d - %d > system dirty_max %d\n",
774 atomic_read(&obd_dirty_pages),
775 atomic_read(&obd_dirty_transit_pages),
776 obd_max_dirty_pages);
778 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
779 CERROR("dirty %lu - dirty_max %lu too big???\n",
780 cli->cl_dirty, cli->cl_dirty_max);
783 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
784 (cli->cl_max_rpcs_in_flight + 1);
785 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
787 oa->o_grant = cli->cl_avail_grant;
788 oa->o_dropped = cli->cl_lost_grant;
789 cli->cl_lost_grant = 0;
790 client_obd_list_unlock(&cli->cl_loi_list_lock);
791 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
792 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
796 static void osc_update_next_shrink(struct client_obd *cli)
798 cli->cl_next_shrink_grant =
799 cfs_time_shift(cli->cl_grant_shrink_interval);
800 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
801 cli->cl_next_shrink_grant);
804 /* caller must hold loi_list_lock */
805 static void osc_consume_write_grant(struct client_obd *cli,
806 struct brw_page *pga)
808 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
809 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
810 atomic_inc(&obd_dirty_pages);
811 cli->cl_dirty += CFS_PAGE_SIZE;
812 cli->cl_avail_grant -= CFS_PAGE_SIZE;
813 pga->flag |= OBD_BRW_FROM_GRANT;
814 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
815 CFS_PAGE_SIZE, pga, pga->pg);
816 LASSERT(cli->cl_avail_grant >= 0);
817 osc_update_next_shrink(cli);
820 /* the companion to osc_consume_write_grant, called when a brw has completed.
821 * must be called with the loi lock held. */
822 static void osc_release_write_grant(struct client_obd *cli,
823 struct brw_page *pga, int sent)
825 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
828 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
829 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
834 pga->flag &= ~OBD_BRW_FROM_GRANT;
835 atomic_dec(&obd_dirty_pages);
836 cli->cl_dirty -= CFS_PAGE_SIZE;
837 if (pga->flag & OBD_BRW_NOCACHE) {
838 pga->flag &= ~OBD_BRW_NOCACHE;
839 atomic_dec(&obd_dirty_transit_pages);
840 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
843 cli->cl_lost_grant += CFS_PAGE_SIZE;
844 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
845 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
846 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
847 /* For short writes we shouldn't count parts of pages that
848 * span a whole block on the OST side, or our accounting goes
849 * wrong. Should match the code in filter_grant_check. */
850 int offset = pga->off & ~CFS_PAGE_MASK;
851 int count = pga->count + (offset & (blocksize - 1));
852 int end = (offset + pga->count) & (blocksize - 1);
854 count += blocksize - end;
856 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
857 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
858 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
859 cli->cl_avail_grant, cli->cl_dirty);
865 static unsigned long rpcs_in_flight(struct client_obd *cli)
867 return cli->cl_r_in_flight + cli->cl_w_in_flight;
870 /* caller must hold loi_list_lock */
871 void osc_wake_cache_waiters(struct client_obd *cli)
873 struct list_head *l, *tmp;
874 struct osc_cache_waiter *ocw;
877 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
878 /* if we can't dirty more, we must wait until some is written */
879 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
880 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
881 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
882 "osc max %ld, sys max %d\n", cli->cl_dirty,
883 cli->cl_dirty_max, obd_max_dirty_pages);
887 /* if still dirty cache but no grant wait for pending RPCs that
888 * may yet return us some grant before doing sync writes */
889 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
890 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
891 cli->cl_w_in_flight);
895 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
896 list_del_init(&ocw->ocw_entry);
897 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
898 /* no more RPCs in flight to return grant, do sync IO */
899 ocw->ocw_rc = -EDQUOT;
900 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
902 osc_consume_write_grant(cli,
903 &ocw->ocw_oap->oap_brw_page);
906 cfs_waitq_signal(&ocw->ocw_waitq);
912 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
914 client_obd_list_lock(&cli->cl_loi_list_lock);
915 cli->cl_avail_grant += grant;
916 client_obd_list_unlock(&cli->cl_loi_list_lock);
919 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
921 if (body->oa.o_valid & OBD_MD_FLGRANT) {
922 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
923 __osc_update_grant(cli, body->oa.o_grant);
927 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
928 void *key, obd_count vallen, void *val,
929 struct ptlrpc_request_set *set);
931 static int osc_shrink_grant_interpret(const struct lu_env *env,
932 struct ptlrpc_request *req,
935 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
936 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
937 struct ost_body *body;
940 __osc_update_grant(cli, oa->o_grant);
944 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
946 osc_update_grant(cli, body);
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
954 client_obd_list_lock(&cli->cl_loi_list_lock);
955 oa->o_grant = cli->cl_avail_grant / 4;
956 cli->cl_avail_grant -= oa->o_grant;
957 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 oa->o_flags |= OBD_FL_SHRINK_GRANT;
959 osc_update_next_shrink(cli);
962 /* Shrink the current grant, either from some large amount to enough for a
963 * full set of in-flight RPCs, or if we have already shrunk to that limit
964 * then to enough for a single RPC. This avoids keeping more grant than
965 * needed, and avoids shrinking the grant piecemeal. */
966 static int osc_shrink_grant(struct client_obd *cli)
968 long target = (cli->cl_max_rpcs_in_flight + 1) *
969 cli->cl_max_pages_per_rpc;
971 client_obd_list_lock(&cli->cl_loi_list_lock);
972 if (cli->cl_avail_grant <= target)
973 target = cli->cl_max_pages_per_rpc;
974 client_obd_list_unlock(&cli->cl_loi_list_lock);
976 return osc_shrink_grant_to_target(cli, target);
979 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
982 struct ost_body *body;
985 client_obd_list_lock(&cli->cl_loi_list_lock);
986 /* Don't shrink if we are already above or below the desired limit
987 * We don't want to shrink below a single RPC, as that will negatively
988 * impact block allocation and long-term performance. */
989 if (target < cli->cl_max_pages_per_rpc)
990 target = cli->cl_max_pages_per_rpc;
992 if (target >= cli->cl_avail_grant) {
993 client_obd_list_unlock(&cli->cl_loi_list_lock);
996 client_obd_list_unlock(&cli->cl_loi_list_lock);
1002 osc_announce_cached(cli, &body->oa, 0);
1004 client_obd_list_lock(&cli->cl_loi_list_lock);
1005 body->oa.o_grant = cli->cl_avail_grant - target;
1006 cli->cl_avail_grant = target;
1007 client_obd_list_unlock(&cli->cl_loi_list_lock);
1008 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1009 osc_update_next_shrink(cli);
1011 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1012 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1013 sizeof(*body), body, NULL);
1015 __osc_update_grant(cli, body->oa.o_grant);
1020 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1021 static int osc_should_shrink_grant(struct client_obd *client)
1023 cfs_time_t time = cfs_time_current();
1024 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1025 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1026 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1027 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1030 osc_update_next_shrink(client);
1035 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1037 struct client_obd *client;
1039 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1040 if (osc_should_shrink_grant(client))
1041 osc_shrink_grant(client);
1046 static int osc_add_shrink_grant(struct client_obd *client)
1050 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1052 osc_grant_shrink_grant_cb, NULL,
1053 &client->cl_grant_shrink_list);
1055 CERROR("add grant client %s error %d\n",
1056 client->cl_import->imp_obd->obd_name, rc);
1059 CDEBUG(D_CACHE, "add grant client %s \n",
1060 client->cl_import->imp_obd->obd_name);
1061 osc_update_next_shrink(client);
1065 static int osc_del_shrink_grant(struct client_obd *client)
1067 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1071 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1074 * ocd_grant is the total grant amount we're expect to hold: if we've
1075 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1076 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1078 * race is tolerable here: if we're evicted, but imp_state already
1079 * left EVICTED state, then cl_dirty must be 0 already.
1081 client_obd_list_lock(&cli->cl_loi_list_lock);
1082 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1083 cli->cl_avail_grant = ocd->ocd_grant;
1085 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1086 client_obd_list_unlock(&cli->cl_loi_list_lock);
1088 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1089 cli->cl_avail_grant, cli->cl_lost_grant);
1090 LASSERT(cli->cl_avail_grant >= 0);
1092 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1093 list_empty(&cli->cl_grant_shrink_list))
1094 osc_add_shrink_grant(cli);
1097 /* We assume that the reason this OSC got a short read is because it read
1098 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1099 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1100 * this stripe never got written at or beyond this stripe offset yet. */
1101 static void handle_short_read(int nob_read, obd_count page_count,
1102 struct brw_page **pga)
1107 /* skip bytes read OK */
1108 while (nob_read > 0) {
1109 LASSERT (page_count > 0);
1111 if (pga[i]->count > nob_read) {
1112 /* EOF inside this page */
1113 ptr = cfs_kmap(pga[i]->pg) +
1114 (pga[i]->off & ~CFS_PAGE_MASK);
1115 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1116 cfs_kunmap(pga[i]->pg);
1122 nob_read -= pga[i]->count;
1127 /* zero remaining pages */
1128 while (page_count-- > 0) {
1129 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1130 memset(ptr, 0, pga[i]->count);
1131 cfs_kunmap(pga[i]->pg);
1136 static int check_write_rcs(struct ptlrpc_request *req,
1137 int requested_nob, int niocount,
1138 obd_count page_count, struct brw_page **pga)
1143 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1144 sizeof(*remote_rcs) *
1146 if (remote_rcs == NULL) {
1147 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1151 /* return error if any niobuf was in error */
1152 for (i = 0; i < niocount; i++) {
1153 if (remote_rcs[i] < 0)
1154 return(remote_rcs[i]);
1156 if (remote_rcs[i] != 0) {
1157 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1158 i, remote_rcs[i], req);
1163 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1164 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1165 req->rq_bulk->bd_nob_transferred, requested_nob);
1172 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1174 if (p1->flag != p2->flag) {
1175 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1176 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1178 /* warn if we try to combine flags that we don't know to be
1179 * safe to combine */
1180 if ((p1->flag & mask) != (p2->flag & mask))
1181 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1182 "same brw?\n", p1->flag, p2->flag);
1186 return (p1->off + p1->count == p2->off);
1189 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1190 struct brw_page **pga, int opc,
1191 cksum_type_t cksum_type)
1196 LASSERT (pg_count > 0);
1197 cksum = init_checksum(cksum_type);
1198 while (nob > 0 && pg_count > 0) {
1199 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1200 int off = pga[i]->off & ~CFS_PAGE_MASK;
1201 int count = pga[i]->count > nob ? nob : pga[i]->count;
1203 /* corrupt the data before we compute the checksum, to
1204 * simulate an OST->client data error */
1205 if (i == 0 && opc == OST_READ &&
1206 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1207 memcpy(ptr + off, "bad1", min(4, nob));
1208 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1209 cfs_kunmap(pga[i]->pg);
1210 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1213 nob -= pga[i]->count;
1217 /* For sending we only compute the wrong checksum instead
1218 * of corrupting the data so it is still correct on a redo */
1219 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1225 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1226 struct lov_stripe_md *lsm, obd_count page_count,
1227 struct brw_page **pga,
1228 struct ptlrpc_request **reqp,
1229 struct obd_capa *ocapa, int reserve)
1231 struct ptlrpc_request *req;
1232 struct ptlrpc_bulk_desc *desc;
1233 struct ost_body *body;
1234 struct obd_ioobj *ioobj;
1235 struct niobuf_remote *niobuf;
1236 int niocount, i, requested_nob, opc, rc;
1237 struct osc_brw_async_args *aa;
1238 struct req_capsule *pill;
1239 struct brw_page *pg_prev;
1242 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1243 RETURN(-ENOMEM); /* Recoverable */
1244 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1245 RETURN(-EINVAL); /* Fatal */
1247 if ((cmd & OBD_BRW_WRITE) != 0) {
1249 req = ptlrpc_request_alloc_pool(cli->cl_import,
1250 cli->cl_import->imp_rq_pool,
1254 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1259 for (niocount = i = 1; i < page_count; i++) {
1260 if (!can_merge_pages(pga[i - 1], pga[i]))
1264 pill = &req->rq_pill;
1265 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1267 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1268 niocount * sizeof(*niobuf));
1269 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1271 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1273 ptlrpc_request_free(req);
1276 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1277 ptlrpc_at_set_req_timeout(req);
1279 if (opc == OST_WRITE)
1280 desc = ptlrpc_prep_bulk_imp(req, page_count,
1281 BULK_GET_SOURCE, OST_BULK_PORTAL);
1283 desc = ptlrpc_prep_bulk_imp(req, page_count,
1284 BULK_PUT_SINK, OST_BULK_PORTAL);
1287 GOTO(out, rc = -ENOMEM);
1288 /* NB request now owns desc and will free it when it gets freed */
1290 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1291 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1292 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1293 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1295 lustre_set_wire_obdo(&body->oa, oa);
1297 obdo_to_ioobj(oa, ioobj);
1298 ioobj->ioo_bufcnt = niocount;
1299 osc_pack_capa(req, body, ocapa);
1300 LASSERT (page_count > 0);
1302 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1303 struct brw_page *pg = pga[i];
1305 LASSERT(pg->count > 0);
1306 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1307 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1308 pg->off, pg->count);
1310 LASSERTF(i == 0 || pg->off > pg_prev->off,
1311 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1312 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1314 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1315 pg_prev->pg, page_private(pg_prev->pg),
1316 pg_prev->pg->index, pg_prev->off);
1318 LASSERTF(i == 0 || pg->off > pg_prev->off,
1319 "i %d p_c %u\n", i, page_count);
1321 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1322 (pg->flag & OBD_BRW_SRVLOCK));
1324 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1326 requested_nob += pg->count;
1328 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1330 niobuf->len += pg->count;
1332 niobuf->offset = pg->off;
1333 niobuf->len = pg->count;
1334 niobuf->flags = pg->flag;
1339 LASSERTF((void *)(niobuf - niocount) ==
1340 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1341 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1342 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1344 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1345 if (osc_should_shrink_grant(cli))
1346 osc_shrink_grant_local(cli, &body->oa);
1348 /* size[REQ_REC_OFF] still sizeof (*body) */
1349 if (opc == OST_WRITE) {
1350 if (unlikely(cli->cl_checksum) &&
1351 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1352 /* store cl_cksum_type in a local variable since
1353 * it can be changed via lprocfs */
1354 cksum_type_t cksum_type = cli->cl_cksum_type;
1356 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1357 oa->o_flags &= OBD_FL_LOCAL_MASK;
1358 body->oa.o_flags = 0;
1360 body->oa.o_flags |= cksum_type_pack(cksum_type);
1361 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1362 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1366 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1368 /* save this in 'oa', too, for later checking */
1369 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1370 oa->o_flags |= cksum_type_pack(cksum_type);
1372 /* clear out the checksum flag, in case this is a
1373 * resend but cl_checksum is no longer set. b=11238 */
1374 oa->o_valid &= ~OBD_MD_FLCKSUM;
1376 oa->o_cksum = body->oa.o_cksum;
1377 /* 1 RC per niobuf */
1378 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1379 sizeof(__u32) * niocount);
1381 if (unlikely(cli->cl_checksum) &&
1382 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1383 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1384 body->oa.o_flags = 0;
1385 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1386 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1388 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 0);
1389 /* 1 RC for the whole I/O */
1391 ptlrpc_request_set_replen(req);
1393 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1394 aa = ptlrpc_req_async_args(req);
1396 aa->aa_requested_nob = requested_nob;
1397 aa->aa_nio_count = niocount;
1398 aa->aa_page_count = page_count;
1402 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1403 if (ocapa && reserve)
1404 aa->aa_ocapa = capa_get(ocapa);
1410 ptlrpc_req_finished(req);
1414 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415 __u32 client_cksum, __u32 server_cksum, int nob,
1416 obd_count page_count, struct brw_page **pga,
1417 cksum_type_t client_cksum_type)
1421 cksum_type_t cksum_type;
1423 if (server_cksum == client_cksum) {
1424 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1428 if (oa->o_valid & OBD_MD_FLFLAGS)
1429 cksum_type = cksum_type_unpack(oa->o_flags);
1431 cksum_type = OBD_CKSUM_CRC32;
1433 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1436 if (cksum_type != client_cksum_type)
1437 msg = "the server did not use the checksum type specified in "
1438 "the original request - likely a protocol problem";
1439 else if (new_cksum == server_cksum)
1440 msg = "changed on the client after we checksummed it - "
1441 "likely false positive due to mmap IO (bug 11742)";
1442 else if (new_cksum == client_cksum)
1443 msg = "changed in transit before arrival at OST";
1445 msg = "changed in transit AND doesn't match the original - "
1446 "likely false positive due to mmap IO (bug 11742)";
1448 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1449 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1450 "["LPU64"-"LPU64"]\n",
1451 msg, libcfs_nid2str(peer->nid),
1452 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1453 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1456 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1458 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1459 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1460 "client csum now %x\n", client_cksum, client_cksum_type,
1461 server_cksum, cksum_type, new_cksum);
1465 /* Note rc enters this function as number of bytes transferred */
1466 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1468 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1469 const lnet_process_id_t *peer =
1470 &req->rq_import->imp_connection->c_peer;
1471 struct client_obd *cli = aa->aa_cli;
1472 struct ost_body *body;
1473 __u32 client_cksum = 0;
1476 if (rc < 0 && rc != -EDQUOT)
1479 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1480 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1482 CDEBUG(D_INFO, "Can't unpack body\n");
1486 /* set/clear over quota flag for a uid/gid */
1487 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1488 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1489 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1491 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1498 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1499 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1501 osc_update_grant(cli, body);
1503 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1505 CERROR("Unexpected +ve rc %d\n", rc);
1508 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1510 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1513 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1514 check_write_checksum(&body->oa, peer, client_cksum,
1515 body->oa.o_cksum, aa->aa_requested_nob,
1516 aa->aa_page_count, aa->aa_ppga,
1517 cksum_type_unpack(aa->aa_oa->o_flags)))
1520 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1521 aa->aa_page_count, aa->aa_ppga);
1525 /* The rest of this function executes only for OST_READs */
1527 /* if unwrap_bulk failed, return -EAGAIN to retry */
1528 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1530 GOTO(out, rc = -EAGAIN);
1532 if (rc > aa->aa_requested_nob) {
1533 CERROR("Unexpected rc %d (%d requested)\n", rc,
1534 aa->aa_requested_nob);
1538 if (rc != req->rq_bulk->bd_nob_transferred) {
1539 CERROR ("Unexpected rc %d (%d transferred)\n",
1540 rc, req->rq_bulk->bd_nob_transferred);
1544 if (rc < aa->aa_requested_nob)
1545 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1547 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1548 static int cksum_counter;
1549 __u32 server_cksum = body->oa.o_cksum;
1552 cksum_type_t cksum_type;
1554 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1555 cksum_type = cksum_type_unpack(body->oa.o_flags);
1557 cksum_type = OBD_CKSUM_CRC32;
1558 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1559 aa->aa_ppga, OST_READ,
1562 if (peer->nid == req->rq_bulk->bd_sender) {
1566 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1569 if (server_cksum == ~0 && rc > 0) {
1570 CERROR("Protocol error: server %s set the 'checksum' "
1571 "bit, but didn't send a checksum. Not fatal, "
1572 "but please notify on http://bugzilla.lustre.org/\n",
1573 libcfs_nid2str(peer->nid));
1574 } else if (server_cksum != client_cksum) {
1575 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1576 "%s%s%s inum "LPU64"/"LPU64" object "
1577 LPU64"/"LPU64" extent "
1578 "["LPU64"-"LPU64"]\n",
1579 req->rq_import->imp_obd->obd_name,
1580 libcfs_nid2str(peer->nid),
1582 body->oa.o_valid & OBD_MD_FLFID ?
1583 body->oa.o_fid : (__u64)0,
1584 body->oa.o_valid & OBD_MD_FLFID ?
1585 body->oa.o_generation :(__u64)0,
1587 body->oa.o_valid & OBD_MD_FLGROUP ?
1588 body->oa.o_gr : (__u64)0,
1589 aa->aa_ppga[0]->off,
1590 aa->aa_ppga[aa->aa_page_count-1]->off +
1591 aa->aa_ppga[aa->aa_page_count-1]->count -
1593 CERROR("client %x, server %x, cksum_type %x\n",
1594 client_cksum, server_cksum, cksum_type);
1596 aa->aa_oa->o_cksum = client_cksum;
1600 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1603 } else if (unlikely(client_cksum)) {
1604 static int cksum_missed;
1607 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1608 CERROR("Checksum %u requested from %s but not sent\n",
1609 cksum_missed, libcfs_nid2str(peer->nid));
1615 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1620 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1621 struct lov_stripe_md *lsm,
1622 obd_count page_count, struct brw_page **pga,
1623 struct obd_capa *ocapa)
1625 struct ptlrpc_request *req;
1629 struct l_wait_info lwi;
1633 cfs_waitq_init(&waitq);
1636 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1637 page_count, pga, &req, ocapa, 0);
1641 rc = ptlrpc_queue_wait(req);
1643 if (rc == -ETIMEDOUT && req->rq_resend) {
1644 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1645 ptlrpc_req_finished(req);
1649 rc = osc_brw_fini_request(req, rc);
1651 ptlrpc_req_finished(req);
1652 if (osc_recoverable_error(rc)) {
1654 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1655 CERROR("too many resend retries, returning error\n");
1659 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1660 l_wait_event(waitq, 0, &lwi);
1668 int osc_brw_redo_request(struct ptlrpc_request *request,
1669 struct osc_brw_async_args *aa)
1671 struct ptlrpc_request *new_req;
1672 struct ptlrpc_request_set *set = request->rq_set;
1673 struct osc_brw_async_args *new_aa;
1674 struct osc_async_page *oap;
1678 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1679 CERROR("too many resend retries, returning error\n");
1683 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1685 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1686 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1687 aa->aa_cli, aa->aa_oa,
1688 NULL /* lsm unused by osc currently */,
1689 aa->aa_page_count, aa->aa_ppga,
1690 &new_req, aa->aa_ocapa, 0);
1694 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1696 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1697 if (oap->oap_request != NULL) {
1698 LASSERTF(request == oap->oap_request,
1699 "request %p != oap_request %p\n",
1700 request, oap->oap_request);
1701 if (oap->oap_interrupted) {
1702 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1703 ptlrpc_req_finished(new_req);
1708 /* New request takes over pga and oaps from old request.
1709 * Note that copying a list_head doesn't work, need to move it... */
1711 new_req->rq_interpret_reply = request->rq_interpret_reply;
1712 new_req->rq_async_args = request->rq_async_args;
1713 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1715 new_aa = ptlrpc_req_async_args(new_req);
1717 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1718 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1719 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1721 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1722 if (oap->oap_request) {
1723 ptlrpc_req_finished(oap->oap_request);
1724 oap->oap_request = ptlrpc_request_addref(new_req);
1728 new_aa->aa_ocapa = aa->aa_ocapa;
1729 aa->aa_ocapa = NULL;
1731 /* use ptlrpc_set_add_req is safe because interpret functions work
1732 * in check_set context. only one way exist with access to request
1733 * from different thread got -EINTR - this way protected with
1734 * cl_loi_list_lock */
1735 ptlrpc_set_add_req(set, new_req);
1737 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1739 DEBUG_REQ(D_INFO, new_req, "new request");
1744 * ugh, we want disk allocation on the target to happen in offset order. we'll
1745 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1746 * fine for our small page arrays and doesn't require allocation. its an
1747 * insertion sort that swaps elements that are strides apart, shrinking the
1748 * stride down until its '1' and the array is sorted.
1750 static void sort_brw_pages(struct brw_page **array, int num)
1753 struct brw_page *tmp;
1757 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1762 for (i = stride ; i < num ; i++) {
1765 while (j >= stride && array[j - stride]->off > tmp->off) {
1766 array[j] = array[j - stride];
1771 } while (stride > 1);
1774 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1780 LASSERT (pages > 0);
1781 offset = pg[i]->off & ~CFS_PAGE_MASK;
1785 if (pages == 0) /* that's all */
1788 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1789 return count; /* doesn't end on page boundary */
1792 offset = pg[i]->off & ~CFS_PAGE_MASK;
1793 if (offset != 0) /* doesn't start on page boundary */
1800 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1802 struct brw_page **ppga;
1805 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1809 for (i = 0; i < count; i++)
1814 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1816 LASSERT(ppga != NULL);
1817 OBD_FREE(ppga, sizeof(*ppga) * count);
1820 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1821 obd_count page_count, struct brw_page *pga,
1822 struct obd_trans_info *oti)
1824 struct obdo *saved_oa = NULL;
1825 struct brw_page **ppga, **orig;
1826 struct obd_import *imp = class_exp2cliimp(exp);
1827 struct client_obd *cli;
1828 int rc, page_count_orig;
1831 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1832 cli = &imp->imp_obd->u.cli;
1834 if (cmd & OBD_BRW_CHECK) {
1835 /* The caller just wants to know if there's a chance that this
1836 * I/O can succeed */
1838 if (imp->imp_invalid)
1843 /* test_brw with a failed create can trip this, maybe others. */
1844 LASSERT(cli->cl_max_pages_per_rpc);
1848 orig = ppga = osc_build_ppga(pga, page_count);
1851 page_count_orig = page_count;
1853 sort_brw_pages(ppga, page_count);
1854 while (page_count) {
1855 obd_count pages_per_brw;
1857 if (page_count > cli->cl_max_pages_per_rpc)
1858 pages_per_brw = cli->cl_max_pages_per_rpc;
1860 pages_per_brw = page_count;
1862 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1864 if (saved_oa != NULL) {
1865 /* restore previously saved oa */
1866 *oinfo->oi_oa = *saved_oa;
1867 } else if (page_count > pages_per_brw) {
1868 /* save a copy of oa (brw will clobber it) */
1869 OBDO_ALLOC(saved_oa);
1870 if (saved_oa == NULL)
1871 GOTO(out, rc = -ENOMEM);
1872 *saved_oa = *oinfo->oi_oa;
1875 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1876 pages_per_brw, ppga, oinfo->oi_capa);
1881 page_count -= pages_per_brw;
1882 ppga += pages_per_brw;
1886 osc_release_ppga(orig, page_count_orig);
1888 if (saved_oa != NULL)
1889 OBDO_FREE(saved_oa);
1894 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1895 * the dirty accounting. Writeback completes or truncate happens before
1896 * writing starts. Must be called with the loi lock held. */
1897 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1900 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1904 /* This maintains the lists of pending pages to read/write for a given object
1905 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1906 * to quickly find objects that are ready to send an RPC. */
1907 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1913 if (lop->lop_num_pending == 0)
1916 /* if we have an invalid import we want to drain the queued pages
1917 * by forcing them through rpcs that immediately fail and complete
1918 * the pages. recovery relies on this to empty the queued pages
1919 * before canceling the locks and evicting down the llite pages */
1920 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1923 /* stream rpcs in queue order as long as as there is an urgent page
1924 * queued. this is our cheap solution for good batching in the case
1925 * where writepage marks some random page in the middle of the file
1926 * as urgent because of, say, memory pressure */
1927 if (!list_empty(&lop->lop_urgent)) {
1928 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1931 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1932 optimal = cli->cl_max_pages_per_rpc;
1933 if (cmd & OBD_BRW_WRITE) {
1934 /* trigger a write rpc stream as long as there are dirtiers
1935 * waiting for space. as they're waiting, they're not going to
1936 * create more pages to coallesce with what's waiting.. */
1937 if (!list_empty(&cli->cl_cache_waiters)) {
1938 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1941 /* +16 to avoid triggering rpcs that would want to include pages
1942 * that are being queued but which can't be made ready until
1943 * the queuer finishes with the page. this is a wart for
1944 * llite::commit_write() */
1947 if (lop->lop_num_pending >= optimal)
1953 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1955 struct osc_async_page *oap;
1958 if (list_empty(&lop->lop_urgent))
1961 oap = list_entry(lop->lop_urgent.next,
1962 struct osc_async_page, oap_urgent_item);
1964 if (oap->oap_async_flags & ASYNC_HP) {
1965 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1972 static void on_list(struct list_head *item, struct list_head *list,
1975 if (list_empty(item) && should_be_on)
1976 list_add_tail(item, list);
1977 else if (!list_empty(item) && !should_be_on)
1978 list_del_init(item);
1981 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1982 * can find pages to build into rpcs quickly */
1983 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1985 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1986 lop_makes_hprpc(&loi->loi_read_lop)) {
1988 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1989 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1991 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1992 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1993 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1994 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1997 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1998 loi->loi_write_lop.lop_num_pending);
2000 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2001 loi->loi_read_lop.lop_num_pending);
2004 static void lop_update_pending(struct client_obd *cli,
2005 struct loi_oap_pages *lop, int cmd, int delta)
2007 lop->lop_num_pending += delta;
2008 if (cmd & OBD_BRW_WRITE)
2009 cli->cl_pending_w_pages += delta;
2011 cli->cl_pending_r_pages += delta;
2015 * this is called when a sync waiter receives an interruption. Its job is to
2016 * get the caller woken as soon as possible. If its page hasn't been put in an
2017 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2018 * desiring interruption which will forcefully complete the rpc once the rpc
2021 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2023 struct loi_oap_pages *lop;
2024 struct lov_oinfo *loi;
2028 LASSERT(!oap->oap_interrupted);
2029 oap->oap_interrupted = 1;
2031 /* ok, it's been put in an rpc. only one oap gets a request reference */
2032 if (oap->oap_request != NULL) {
2033 ptlrpc_mark_interrupted(oap->oap_request);
2034 ptlrpcd_wake(oap->oap_request);
2035 ptlrpc_req_finished(oap->oap_request);
2036 oap->oap_request = NULL;
2040 * page completion may be called only if ->cpo_prep() method was
2041 * executed by osc_io_submit(), that also adds page the to pending list
2043 if (!list_empty(&oap->oap_pending_item)) {
2044 list_del_init(&oap->oap_pending_item);
2045 list_del_init(&oap->oap_urgent_item);
2048 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2049 &loi->loi_write_lop : &loi->loi_read_lop;
2050 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2051 loi_list_maint(oap->oap_cli, oap->oap_loi);
2052 rc = oap->oap_caller_ops->ap_completion(env,
2053 oap->oap_caller_data,
2054 oap->oap_cmd, NULL, -EINTR);
2060 /* this is trying to propogate async writeback errors back up to the
2061 * application. As an async write fails we record the error code for later if
2062 * the app does an fsync. As long as errors persist we force future rpcs to be
2063 * sync so that the app can get a sync error and break the cycle of queueing
2064 * pages for which writeback will fail. */
2065 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2072 ar->ar_force_sync = 1;
2073 ar->ar_min_xid = ptlrpc_sample_next_xid();
2078 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2079 ar->ar_force_sync = 0;
2082 void osc_oap_to_pending(struct osc_async_page *oap)
2084 struct loi_oap_pages *lop;
2086 if (oap->oap_cmd & OBD_BRW_WRITE)
2087 lop = &oap->oap_loi->loi_write_lop;
2089 lop = &oap->oap_loi->loi_read_lop;
2091 if (oap->oap_async_flags & ASYNC_HP)
2092 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2093 else if (oap->oap_async_flags & ASYNC_URGENT)
2094 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2095 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2096 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2099 /* this must be called holding the loi list lock to give coverage to exit_cache,
2100 * async_flag maintenance, and oap_request */
2101 static void osc_ap_completion(const struct lu_env *env,
2102 struct client_obd *cli, struct obdo *oa,
2103 struct osc_async_page *oap, int sent, int rc)
2108 if (oap->oap_request != NULL) {
2109 xid = ptlrpc_req_xid(oap->oap_request);
2110 ptlrpc_req_finished(oap->oap_request);
2111 oap->oap_request = NULL;
2114 spin_lock(&oap->oap_lock);
2115 oap->oap_async_flags = 0;
2116 spin_unlock(&oap->oap_lock);
2117 oap->oap_interrupted = 0;
2119 if (oap->oap_cmd & OBD_BRW_WRITE) {
2120 osc_process_ar(&cli->cl_ar, xid, rc);
2121 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2124 if (rc == 0 && oa != NULL) {
2125 if (oa->o_valid & OBD_MD_FLBLOCKS)
2126 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2127 if (oa->o_valid & OBD_MD_FLMTIME)
2128 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2129 if (oa->o_valid & OBD_MD_FLATIME)
2130 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2131 if (oa->o_valid & OBD_MD_FLCTIME)
2132 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2135 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2136 oap->oap_cmd, oa, rc);
2138 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2139 * I/O on the page could start, but OSC calls it under lock
2140 * and thus we can add oap back to pending safely */
2142 /* upper layer wants to leave the page on pending queue */
2143 osc_oap_to_pending(oap);
2145 osc_exit_cache(cli, oap, sent);
2149 static int brw_interpret(const struct lu_env *env,
2150 struct ptlrpc_request *req, void *data, int rc)
2152 struct osc_brw_async_args *aa = data;
2153 struct client_obd *cli;
2157 rc = osc_brw_fini_request(req, rc);
2158 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2159 if (osc_recoverable_error(rc)) {
2160 rc = osc_brw_redo_request(req, aa);
2166 capa_put(aa->aa_ocapa);
2167 aa->aa_ocapa = NULL;
2172 client_obd_list_lock(&cli->cl_loi_list_lock);
2174 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2175 * is called so we know whether to go to sync BRWs or wait for more
2176 * RPCs to complete */
2177 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2178 cli->cl_w_in_flight--;
2180 cli->cl_r_in_flight--;
2182 async = list_empty(&aa->aa_oaps);
2183 if (!async) { /* from osc_send_oap_rpc() */
2184 struct osc_async_page *oap, *tmp;
2185 /* the caller may re-use the oap after the completion call so
2186 * we need to clean it up a little */
2187 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2188 list_del_init(&oap->oap_rpc_item);
2189 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2191 OBDO_FREE(aa->aa_oa);
2192 } else { /* from async_internal() */
2194 for (i = 0; i < aa->aa_page_count; i++)
2195 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2197 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2198 OBDO_FREE(aa->aa_oa);
2200 osc_wake_cache_waiters(cli);
2201 osc_check_rpcs(env, cli);
2202 client_obd_list_unlock(&cli->cl_loi_list_lock);
2204 cl_req_completion(env, aa->aa_clerq, rc);
2205 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2209 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2210 struct client_obd *cli,
2211 struct list_head *rpc_list,
2212 int page_count, int cmd)
2214 struct ptlrpc_request *req;
2215 struct brw_page **pga = NULL;
2216 struct osc_brw_async_args *aa;
2217 struct obdo *oa = NULL;
2218 const struct obd_async_page_ops *ops = NULL;
2219 void *caller_data = NULL;
2220 struct osc_async_page *oap;
2221 struct osc_async_page *tmp;
2222 struct ost_body *body;
2223 struct cl_req *clerq = NULL;
2224 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2225 struct ldlm_lock *lock = NULL;
2226 struct cl_req_attr crattr;
2230 LASSERT(!list_empty(rpc_list));
2232 memset(&crattr, 0, sizeof crattr);
2233 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2235 GOTO(out, req = ERR_PTR(-ENOMEM));
2239 GOTO(out, req = ERR_PTR(-ENOMEM));
2242 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2243 struct cl_page *page = osc_oap2cl_page(oap);
2245 ops = oap->oap_caller_ops;
2246 caller_data = oap->oap_caller_data;
2248 clerq = cl_req_alloc(env, page, crt,
2249 1 /* only 1-object rpcs for
2252 GOTO(out, req = (void *)clerq);
2253 lock = oap->oap_ldlm_lock;
2255 pga[i] = &oap->oap_brw_page;
2256 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2257 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2258 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2260 cl_req_page_add(env, clerq, page);
2263 /* always get the data for the obdo for the rpc */
2264 LASSERT(ops != NULL);
2266 crattr.cra_capa = NULL;
2267 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2269 oa->o_handle = lock->l_remote_handle;
2270 oa->o_valid |= OBD_MD_FLHANDLE;
2273 rc = cl_req_prep(env, clerq);
2275 CERROR("cl_req_prep failed: %d\n", rc);
2276 GOTO(out, req = ERR_PTR(rc));
2279 sort_brw_pages(pga, page_count);
2280 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2281 pga, &req, crattr.cra_capa, 1);
2283 CERROR("prep_req failed: %d\n", rc);
2284 GOTO(out, req = ERR_PTR(rc));
2287 /* Need to update the timestamps after the request is built in case
2288 * we race with setattr (locally or in queue at OST). If OST gets
2289 * later setattr before earlier BRW (as determined by the request xid),
2290 * the OST will not use BRW timestamps. Sadly, there is no obvious
2291 * way to do this in a single call. bug 10150 */
2292 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2293 cl_req_attr_set(env, clerq, &crattr,
2294 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2296 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2297 aa = ptlrpc_req_async_args(req);
2298 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2299 list_splice(rpc_list, &aa->aa_oaps);
2300 CFS_INIT_LIST_HEAD(rpc_list);
2301 aa->aa_clerq = clerq;
2303 capa_put(crattr.cra_capa);
2308 OBD_FREE(pga, sizeof(*pga) * page_count);
2309 /* this should happen rarely and is pretty bad, it makes the
2310 * pending list not follow the dirty order */
2311 client_obd_list_lock(&cli->cl_loi_list_lock);
2312 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2313 list_del_init(&oap->oap_rpc_item);
2315 /* queued sync pages can be torn down while the pages
2316 * were between the pending list and the rpc */
2317 if (oap->oap_interrupted) {
2318 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2319 osc_ap_completion(env, cli, NULL, oap, 0,
2323 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2325 if (clerq && !IS_ERR(clerq))
2326 cl_req_completion(env, clerq, PTR_ERR(req));
2332 * prepare pages for ASYNC io and put pages in send queue.
2334 * \param cmd OBD_BRW_* macroses
2335 * \param lop pending pages
2337 * \return zero if pages successfully add to send queue.
2338 * \return not zere if error occurring.
2341 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2342 struct lov_oinfo *loi,
2343 int cmd, struct loi_oap_pages *lop)
2345 struct ptlrpc_request *req;
2346 obd_count page_count = 0;
2347 struct osc_async_page *oap = NULL, *tmp;
2348 struct osc_brw_async_args *aa;
2349 const struct obd_async_page_ops *ops;
2350 CFS_LIST_HEAD(rpc_list);
2351 CFS_LIST_HEAD(tmp_list);
2352 unsigned int ending_offset;
2353 unsigned starting_offset = 0;
2355 struct cl_object *clob = NULL;
2358 /* ASYNC_HP pages first. At present, when the lock the pages is
2359 * to be canceled, the pages covered by the lock will be sent out
2360 * with ASYNC_HP. We have to send out them as soon as possible. */
2361 list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2362 if (oap->oap_async_flags & ASYNC_HP)
2363 list_move(&oap->oap_pending_item, &tmp_list);
2365 list_move_tail(&oap->oap_pending_item, &tmp_list);
2366 if (++page_count >= cli->cl_max_pages_per_rpc)
2370 list_splice(&tmp_list, &lop->lop_pending);
2373 /* first we find the pages we're allowed to work with */
2374 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2376 ops = oap->oap_caller_ops;
2378 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2379 "magic 0x%x\n", oap, oap->oap_magic);
2382 /* pin object in memory, so that completion call-backs
2383 * can be safely called under client_obd_list lock. */
2384 clob = osc_oap2cl_page(oap)->cp_obj;
2385 cl_object_get(clob);
2388 if (page_count != 0 &&
2389 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2390 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2391 " oap %p, page %p, srvlock %u\n",
2392 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2396 /* If there is a gap at the start of this page, it can't merge
2397 * with any previous page, so we'll hand the network a
2398 * "fragmented" page array that it can't transfer in 1 RDMA */
2399 if (page_count != 0 && oap->oap_page_off != 0)
2402 /* in llite being 'ready' equates to the page being locked
2403 * until completion unlocks it. commit_write submits a page
2404 * as not ready because its unlock will happen unconditionally
2405 * as the call returns. if we race with commit_write giving
2406 * us that page we dont' want to create a hole in the page
2407 * stream, so we stop and leave the rpc to be fired by
2408 * another dirtier or kupdated interval (the not ready page
2409 * will still be on the dirty list). we could call in
2410 * at the end of ll_file_write to process the queue again. */
2411 if (!(oap->oap_async_flags & ASYNC_READY)) {
2412 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2415 CDEBUG(D_INODE, "oap %p page %p returned %d "
2416 "instead of ready\n", oap,
2420 /* llite is telling us that the page is still
2421 * in commit_write and that we should try
2422 * and put it in an rpc again later. we
2423 * break out of the loop so we don't create
2424 * a hole in the sequence of pages in the rpc
2429 /* the io isn't needed.. tell the checks
2430 * below to complete the rpc with EINTR */
2431 spin_lock(&oap->oap_lock);
2432 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2433 spin_unlock(&oap->oap_lock);
2434 oap->oap_count = -EINTR;
2437 spin_lock(&oap->oap_lock);
2438 oap->oap_async_flags |= ASYNC_READY;
2439 spin_unlock(&oap->oap_lock);
2442 LASSERTF(0, "oap %p page %p returned %d "
2443 "from make_ready\n", oap,
2451 * Page submitted for IO has to be locked. Either by
2452 * ->ap_make_ready() or by higher layers.
2454 #if defined(__KERNEL__) && defined(__linux__)
2456 struct cl_page *page;
2458 page = osc_oap2cl_page(oap);
2460 if (page->cp_type == CPT_CACHEABLE &&
2461 !(PageLocked(oap->oap_page) &&
2462 (CheckWriteback(oap->oap_page, cmd)))) {
2463 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2465 (long)oap->oap_page->flags,
2466 oap->oap_async_flags);
2472 /* take the page out of our book-keeping */
2473 list_del_init(&oap->oap_pending_item);
2474 lop_update_pending(cli, lop, cmd, -1);
2475 list_del_init(&oap->oap_urgent_item);
2477 if (page_count == 0)
2478 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2479 (PTLRPC_MAX_BRW_SIZE - 1);
2481 /* ask the caller for the size of the io as the rpc leaves. */
2482 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2484 ops->ap_refresh_count(env, oap->oap_caller_data,
2486 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2488 if (oap->oap_count <= 0) {
2489 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2491 osc_ap_completion(env, cli, NULL,
2492 oap, 0, oap->oap_count);
2496 /* now put the page back in our accounting */
2497 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2498 if (page_count == 0)
2499 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2500 if (++page_count >= cli->cl_max_pages_per_rpc)
2503 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2504 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2505 * have the same alignment as the initial writes that allocated
2506 * extents on the server. */
2507 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2508 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2509 if (ending_offset == 0)
2512 /* If there is a gap at the end of this page, it can't merge
2513 * with any subsequent pages, so we'll hand the network a
2514 * "fragmented" page array that it can't transfer in 1 RDMA */
2515 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2519 osc_wake_cache_waiters(cli);
2521 loi_list_maint(cli, loi);
2523 client_obd_list_unlock(&cli->cl_loi_list_lock);
2526 cl_object_put(env, clob);
2528 if (page_count == 0) {
2529 client_obd_list_lock(&cli->cl_loi_list_lock);
2533 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2535 LASSERT(list_empty(&rpc_list));
2536 loi_list_maint(cli, loi);
2537 RETURN(PTR_ERR(req));
2540 aa = ptlrpc_req_async_args(req);
2542 if (cmd == OBD_BRW_READ) {
2543 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2544 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2545 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2546 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2548 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2549 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2550 cli->cl_w_in_flight);
2551 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2552 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2554 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2556 client_obd_list_lock(&cli->cl_loi_list_lock);
2558 if (cmd == OBD_BRW_READ)
2559 cli->cl_r_in_flight++;
2561 cli->cl_w_in_flight++;
2563 /* queued sync pages can be torn down while the pages
2564 * were between the pending list and the rpc */
2566 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2567 /* only one oap gets a request reference */
2570 if (oap->oap_interrupted && !req->rq_intr) {
2571 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2573 ptlrpc_mark_interrupted(req);
2577 tmp->oap_request = ptlrpc_request_addref(req);
2579 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2580 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2582 req->rq_interpret_reply = brw_interpret;
2583 ptlrpcd_add_req(req, PSCOPE_BRW);
2587 #define LOI_DEBUG(LOI, STR, args...) \
2588 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2589 !list_empty(&(LOI)->loi_ready_item) || \
2590 !list_empty(&(LOI)->loi_hp_ready_item), \
2591 (LOI)->loi_write_lop.lop_num_pending, \
2592 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2593 (LOI)->loi_read_lop.lop_num_pending, \
2594 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2597 /* This is called by osc_check_rpcs() to find which objects have pages that
2598 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2599 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2603 /* First return objects that have blocked locks so that they
2604 * will be flushed quickly and other clients can get the lock,
2605 * then objects which have pages ready to be stuffed into RPCs */
2606 if (!list_empty(&cli->cl_loi_hp_ready_list))
2607 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2608 struct lov_oinfo, loi_hp_ready_item));
2609 if (!list_empty(&cli->cl_loi_ready_list))
2610 RETURN(list_entry(cli->cl_loi_ready_list.next,
2611 struct lov_oinfo, loi_ready_item));
2613 /* then if we have cache waiters, return all objects with queued
2614 * writes. This is especially important when many small files
2615 * have filled up the cache and not been fired into rpcs because
2616 * they don't pass the nr_pending/object threshhold */
2617 if (!list_empty(&cli->cl_cache_waiters) &&
2618 !list_empty(&cli->cl_loi_write_list))
2619 RETURN(list_entry(cli->cl_loi_write_list.next,
2620 struct lov_oinfo, loi_write_item));
2622 /* then return all queued objects when we have an invalid import
2623 * so that they get flushed */
2624 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2625 if (!list_empty(&cli->cl_loi_write_list))
2626 RETURN(list_entry(cli->cl_loi_write_list.next,
2627 struct lov_oinfo, loi_write_item));
2628 if (!list_empty(&cli->cl_loi_read_list))
2629 RETURN(list_entry(cli->cl_loi_read_list.next,
2630 struct lov_oinfo, loi_read_item));
2635 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2637 struct osc_async_page *oap;
2640 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2641 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2642 struct osc_async_page, oap_urgent_item);
2643 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2646 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2647 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2648 struct osc_async_page, oap_urgent_item);
2649 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2652 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2655 /* called with the loi list lock held */
2656 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2658 struct lov_oinfo *loi;
2659 int rc = 0, race_counter = 0;
2662 while ((loi = osc_next_loi(cli)) != NULL) {
2663 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2665 if (osc_max_rpc_in_flight(cli, loi))
2668 /* attempt some read/write balancing by alternating between
2669 * reads and writes in an object. The makes_rpc checks here
2670 * would be redundant if we were getting read/write work items
2671 * instead of objects. we don't want send_oap_rpc to drain a
2672 * partial read pending queue when we're given this object to
2673 * do io on writes while there are cache waiters */
2674 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2675 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2676 &loi->loi_write_lop);
2678 CERROR("Write request failed with %d\n", rc);
2680 /* osc_send_oap_rpc failed, mostly because of
2683 * It can't break here, because if:
2684 * - a page was submitted by osc_io_submit, so
2686 * - no request in flight
2687 * - no subsequent request
2688 * The system will be in live-lock state,
2689 * because there is no chance to call
2690 * osc_io_unplug() and osc_check_rpcs() any
2691 * more. pdflush can't help in this case,
2692 * because it might be blocked at grabbing
2693 * the page lock as we mentioned.
2695 * Anyway, continue to drain pages. */
2704 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2705 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2706 &loi->loi_read_lop);
2708 CERROR("Read request failed with %d\n", rc);
2716 /* attempt some inter-object balancing by issueing rpcs
2717 * for each object in turn */
2718 if (!list_empty(&loi->loi_hp_ready_item))
2719 list_del_init(&loi->loi_hp_ready_item);
2720 if (!list_empty(&loi->loi_ready_item))
2721 list_del_init(&loi->loi_ready_item);
2722 if (!list_empty(&loi->loi_write_item))
2723 list_del_init(&loi->loi_write_item);
2724 if (!list_empty(&loi->loi_read_item))
2725 list_del_init(&loi->loi_read_item);
2727 loi_list_maint(cli, loi);
2729 /* send_oap_rpc fails with 0 when make_ready tells it to
2730 * back off. llite's make_ready does this when it tries
2731 * to lock a page queued for write that is already locked.
2732 * we want to try sending rpcs from many objects, but we
2733 * don't want to spin failing with 0. */
2734 if (race_counter == 10)
2740 /* we're trying to queue a page in the osc so we're subject to the
2741 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2742 * If the osc's queued pages are already at that limit, then we want to sleep
2743 * until there is space in the osc's queue for us. We also may be waiting for
2744 * write credits from the OST if there are RPCs in flight that may return some
2745 * before we fall back to sync writes.
2747 * We need this know our allocation was granted in the presence of signals */
2748 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2752 client_obd_list_lock(&cli->cl_loi_list_lock);
2753 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2754 client_obd_list_unlock(&cli->cl_loi_list_lock);
2759 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2762 int osc_enter_cache_try(const struct lu_env *env,
2763 struct client_obd *cli, struct lov_oinfo *loi,
2764 struct osc_async_page *oap, int transient)
2768 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2770 osc_consume_write_grant(cli, &oap->oap_brw_page);
2772 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2773 atomic_inc(&obd_dirty_transit_pages);
2774 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2780 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2781 * grant or cache space. */
2782 static int osc_enter_cache(const struct lu_env *env,
2783 struct client_obd *cli, struct lov_oinfo *loi,
2784 struct osc_async_page *oap)
2786 struct osc_cache_waiter ocw;
2787 struct l_wait_info lwi = { 0 };
2791 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2792 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2793 cli->cl_dirty_max, obd_max_dirty_pages,
2794 cli->cl_lost_grant, cli->cl_avail_grant);
2796 /* force the caller to try sync io. this can jump the list
2797 * of queued writes and create a discontiguous rpc stream */
2798 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2799 loi->loi_ar.ar_force_sync)
2802 /* Hopefully normal case - cache space and write credits available */
2803 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2804 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2805 osc_enter_cache_try(env, cli, loi, oap, 0))
2808 /* Make sure that there are write rpcs in flight to wait for. This
2809 * is a little silly as this object may not have any pending but
2810 * other objects sure might. */
2811 if (cli->cl_w_in_flight) {
2812 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2813 cfs_waitq_init(&ocw.ocw_waitq);
2817 loi_list_maint(cli, loi);
2818 osc_check_rpcs(env, cli);
2819 client_obd_list_unlock(&cli->cl_loi_list_lock);
2821 CDEBUG(D_CACHE, "sleeping for cache space\n");
2822 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2824 client_obd_list_lock(&cli->cl_loi_list_lock);
2825 if (!list_empty(&ocw.ocw_entry)) {
2826 list_del(&ocw.ocw_entry);
2836 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2837 struct lov_oinfo *loi, cfs_page_t *page,
2838 obd_off offset, const struct obd_async_page_ops *ops,
2839 void *data, void **res, int nocache,
2840 struct lustre_handle *lockh)
2842 struct osc_async_page *oap;
2847 return size_round(sizeof(*oap));
2850 oap->oap_magic = OAP_MAGIC;
2851 oap->oap_cli = &exp->exp_obd->u.cli;
2854 oap->oap_caller_ops = ops;
2855 oap->oap_caller_data = data;
2857 oap->oap_page = page;
2858 oap->oap_obj_off = offset;
2859 if (!client_is_remote(exp) &&
2860 cfs_capable(CFS_CAP_SYS_RESOURCE))
2861 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2863 LASSERT(!(offset & ~CFS_PAGE_MASK));
2865 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2866 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2867 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2868 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2870 spin_lock_init(&oap->oap_lock);
2871 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2875 struct osc_async_page *oap_from_cookie(void *cookie)
2877 struct osc_async_page *oap = cookie;
2878 if (oap->oap_magic != OAP_MAGIC)
2879 return ERR_PTR(-EINVAL);
2883 int osc_queue_async_io(const struct lu_env *env,
2884 struct obd_export *exp, struct lov_stripe_md *lsm,
2885 struct lov_oinfo *loi, void *cookie,
2886 int cmd, obd_off off, int count,
2887 obd_flag brw_flags, enum async_flags async_flags)
2889 struct client_obd *cli = &exp->exp_obd->u.cli;
2890 struct osc_async_page *oap;
2894 oap = oap_from_cookie(cookie);
2896 RETURN(PTR_ERR(oap));
2898 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2901 if (!list_empty(&oap->oap_pending_item) ||
2902 !list_empty(&oap->oap_urgent_item) ||
2903 !list_empty(&oap->oap_rpc_item))
2906 /* check if the file's owner/group is over quota */
2907 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2908 struct cl_object *obj;
2909 struct cl_attr attr; /* XXX put attr into thread info */
2910 unsigned int qid[MAXQUOTAS];
2912 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2914 cl_object_attr_lock(obj);
2915 rc = cl_object_attr_get(env, obj, &attr);
2916 cl_object_attr_unlock(obj);
2918 qid[USRQUOTA] = attr.cat_uid;
2919 qid[GRPQUOTA] = attr.cat_gid;
2921 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2928 loi = lsm->lsm_oinfo[0];
2930 client_obd_list_lock(&cli->cl_loi_list_lock);
2932 LASSERT(off + count <= CFS_PAGE_SIZE);
2934 oap->oap_page_off = off;
2935 oap->oap_count = count;
2936 oap->oap_brw_flags = brw_flags;
2937 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2938 if (libcfs_memory_pressure_get())
2939 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2940 spin_lock(&oap->oap_lock);
2941 oap->oap_async_flags = async_flags;
2942 spin_unlock(&oap->oap_lock);
2944 if (cmd & OBD_BRW_WRITE) {
2945 rc = osc_enter_cache(env, cli, loi, oap);
2947 client_obd_list_unlock(&cli->cl_loi_list_lock);
2952 osc_oap_to_pending(oap);
2953 loi_list_maint(cli, loi);
2955 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2958 osc_check_rpcs(env, cli);
2959 client_obd_list_unlock(&cli->cl_loi_list_lock);
2964 /* aka (~was & now & flag), but this is more clear :) */
2965 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2967 int osc_set_async_flags_base(struct client_obd *cli,
2968 struct lov_oinfo *loi, struct osc_async_page *oap,
2969 obd_flag async_flags)
2971 struct loi_oap_pages *lop;
2975 LASSERT(!list_empty(&oap->oap_pending_item));
2977 if (oap->oap_cmd & OBD_BRW_WRITE) {
2978 lop = &loi->loi_write_lop;
2980 lop = &loi->loi_read_lop;
2983 if ((oap->oap_async_flags & async_flags) == async_flags)
2986 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2987 flags |= ASYNC_READY;
2989 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2990 list_empty(&oap->oap_rpc_item)) {
2991 if (oap->oap_async_flags & ASYNC_HP)
2992 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2994 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2995 flags |= ASYNC_URGENT;
2996 loi_list_maint(cli, loi);
2998 spin_lock(&oap->oap_lock);
2999 oap->oap_async_flags |= flags;
3000 spin_unlock(&oap->oap_lock);
3002 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3003 oap->oap_async_flags);
3007 int osc_teardown_async_page(struct obd_export *exp,
3008 struct lov_stripe_md *lsm,
3009 struct lov_oinfo *loi, void *cookie)
3011 struct client_obd *cli = &exp->exp_obd->u.cli;
3012 struct loi_oap_pages *lop;
3013 struct osc_async_page *oap;
3017 oap = oap_from_cookie(cookie);
3019 RETURN(PTR_ERR(oap));
3022 loi = lsm->lsm_oinfo[0];
3024 if (oap->oap_cmd & OBD_BRW_WRITE) {
3025 lop = &loi->loi_write_lop;
3027 lop = &loi->loi_read_lop;
3030 client_obd_list_lock(&cli->cl_loi_list_lock);
3032 if (!list_empty(&oap->oap_rpc_item))
3033 GOTO(out, rc = -EBUSY);
3035 osc_exit_cache(cli, oap, 0);
3036 osc_wake_cache_waiters(cli);
3038 if (!list_empty(&oap->oap_urgent_item)) {
3039 list_del_init(&oap->oap_urgent_item);
3040 spin_lock(&oap->oap_lock);
3041 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3042 spin_unlock(&oap->oap_lock);
3044 if (!list_empty(&oap->oap_pending_item)) {
3045 list_del_init(&oap->oap_pending_item);
3046 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3048 loi_list_maint(cli, loi);
3049 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3051 client_obd_list_unlock(&cli->cl_loi_list_lock);
3055 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3056 struct ldlm_enqueue_info *einfo,
3059 void *data = einfo->ei_cbdata;
3061 LASSERT(lock != NULL);
3062 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3063 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3064 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3065 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3067 lock_res_and_lock(lock);
3068 spin_lock(&osc_ast_guard);
3069 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3070 lock->l_ast_data = data;
3071 spin_unlock(&osc_ast_guard);
3072 unlock_res_and_lock(lock);
3075 static void osc_set_data_with_check(struct lustre_handle *lockh,
3076 struct ldlm_enqueue_info *einfo,
3079 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3082 osc_set_lock_data_with_check(lock, einfo, flags);
3083 LDLM_LOCK_PUT(lock);
3085 CERROR("lockh %p, data %p - client evicted?\n",
3086 lockh, einfo->ei_cbdata);
3089 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3090 ldlm_iterator_t replace, void *data)
3092 struct ldlm_res_id res_id;
3093 struct obd_device *obd = class_exp2obd(exp);
3095 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3096 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3100 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3101 obd_enqueue_update_f upcall, void *cookie,
3104 int intent = *flags & LDLM_FL_HAS_INTENT;
3108 /* The request was created before ldlm_cli_enqueue call. */
3109 if (rc == ELDLM_LOCK_ABORTED) {
3110 struct ldlm_reply *rep;
3111 rep = req_capsule_server_get(&req->rq_pill,
3114 LASSERT(rep != NULL);
3115 if (rep->lock_policy_res1)
3116 rc = rep->lock_policy_res1;
3120 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3121 *flags |= LDLM_FL_LVB_READY;
3122 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3123 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3126 /* Call the update callback. */
3127 rc = (*upcall)(cookie, rc);
3131 static int osc_enqueue_interpret(const struct lu_env *env,
3132 struct ptlrpc_request *req,
3133 struct osc_enqueue_args *aa, int rc)
3135 struct ldlm_lock *lock;
3136 struct lustre_handle handle;
3139 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3140 * might be freed anytime after lock upcall has been called. */
3141 lustre_handle_copy(&handle, aa->oa_lockh);
3142 mode = aa->oa_ei->ei_mode;
3144 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3146 lock = ldlm_handle2lock(&handle);
3148 /* Take an additional reference so that a blocking AST that
3149 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3150 * to arrive after an upcall has been executed by
3151 * osc_enqueue_fini(). */
3152 ldlm_lock_addref(&handle, mode);
3154 /* Complete obtaining the lock procedure. */
3155 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3156 mode, aa->oa_flags, aa->oa_lvb,
3157 sizeof(*aa->oa_lvb), &handle, rc);
3158 /* Complete osc stuff. */
3159 rc = osc_enqueue_fini(req, aa->oa_lvb,
3160 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3162 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3164 /* Release the lock for async request. */
3165 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3167 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3168 * not already released by
3169 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3171 ldlm_lock_decref(&handle, mode);
3173 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3174 aa->oa_lockh, req, aa);
3175 ldlm_lock_decref(&handle, mode);
3176 LDLM_LOCK_PUT(lock);
3180 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3181 struct lov_oinfo *loi, int flags,
3182 struct ost_lvb *lvb, __u32 mode, int rc)
3184 if (rc == ELDLM_OK) {
3185 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3188 LASSERT(lock != NULL);
3189 loi->loi_lvb = *lvb;
3190 tmp = loi->loi_lvb.lvb_size;
3191 /* Extend KMS up to the end of this lock and no further
3192 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3193 if (tmp > lock->l_policy_data.l_extent.end)
3194 tmp = lock->l_policy_data.l_extent.end + 1;
3195 if (tmp >= loi->loi_kms) {
3196 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3197 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3198 loi_kms_set(loi, tmp);
3200 LDLM_DEBUG(lock, "lock acquired, setting rss="
3201 LPU64"; leaving kms="LPU64", end="LPU64,
3202 loi->loi_lvb.lvb_size, loi->loi_kms,
3203 lock->l_policy_data.l_extent.end);
3205 ldlm_lock_allow_match(lock);
3206 LDLM_LOCK_PUT(lock);
3207 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3208 loi->loi_lvb = *lvb;
3209 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3210 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3214 EXPORT_SYMBOL(osc_update_enqueue);
3216 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3218 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3219 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3220 * other synchronous requests, however keeping some locks and trying to obtain
3221 * others may take a considerable amount of time in a case of ost failure; and
3222 * when other sync requests do not get released lock from a client, the client
3223 * is excluded from the cluster -- such scenarious make the life difficult, so
3224 * release locks just after they are obtained. */
3225 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3226 int *flags, ldlm_policy_data_t *policy,
3227 struct ost_lvb *lvb, int kms_valid,
3228 obd_enqueue_update_f upcall, void *cookie,
3229 struct ldlm_enqueue_info *einfo,
3230 struct lustre_handle *lockh,
3231 struct ptlrpc_request_set *rqset, int async)
3233 struct obd_device *obd = exp->exp_obd;
3234 struct ptlrpc_request *req = NULL;
3235 int intent = *flags & LDLM_FL_HAS_INTENT;
3240 /* Filesystem lock extents are extended to page boundaries so that
3241 * dealing with the page cache is a little smoother. */
3242 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3243 policy->l_extent.end |= ~CFS_PAGE_MASK;
3246 * kms is not valid when either object is completely fresh (so that no
3247 * locks are cached), or object was evicted. In the latter case cached
3248 * lock cannot be used, because it would prime inode state with
3249 * potentially stale LVB.
3254 /* Next, search for already existing extent locks that will cover us */
3255 /* If we're trying to read, we also search for an existing PW lock. The
3256 * VFS and page cache already protect us locally, so lots of readers/
3257 * writers can share a single PW lock.
3259 * There are problems with conversion deadlocks, so instead of
3260 * converting a read lock to a write lock, we'll just enqueue a new
3263 * At some point we should cancel the read lock instead of making them
3264 * send us a blocking callback, but there are problems with canceling
3265 * locks out from other users right now, too. */
3266 mode = einfo->ei_mode;
3267 if (einfo->ei_mode == LCK_PR)
3269 mode = ldlm_lock_match(obd->obd_namespace,
3270 *flags | LDLM_FL_LVB_READY, res_id,
3271 einfo->ei_type, policy, mode, lockh, 0);
3273 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3275 if (matched->l_ast_data == NULL ||
3276 matched->l_ast_data == einfo->ei_cbdata) {
3277 /* addref the lock only if not async requests and PW
3278 * lock is matched whereas we asked for PR. */
3279 if (!rqset && einfo->ei_mode != mode)
3280 ldlm_lock_addref(lockh, LCK_PR);
3281 osc_set_lock_data_with_check(matched, einfo, *flags);
3283 /* I would like to be able to ASSERT here that
3284 * rss <= kms, but I can't, for reasons which
3285 * are explained in lov_enqueue() */
3288 /* We already have a lock, and it's referenced */
3289 (*upcall)(cookie, ELDLM_OK);
3291 /* For async requests, decref the lock. */
3292 if (einfo->ei_mode != mode)
3293 ldlm_lock_decref(lockh, LCK_PW);
3295 ldlm_lock_decref(lockh, einfo->ei_mode);
3296 LDLM_LOCK_PUT(matched);
3299 ldlm_lock_decref(lockh, mode);
3300 LDLM_LOCK_PUT(matched);
3305 CFS_LIST_HEAD(cancels);
3306 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3307 &RQF_LDLM_ENQUEUE_LVB);
3311 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3315 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3317 ptlrpc_request_set_replen(req);
3320 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3321 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3323 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3324 sizeof(*lvb), lockh, async);
3327 struct osc_enqueue_args *aa;
3328 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3329 aa = ptlrpc_req_async_args(req);
3332 aa->oa_flags = flags;
3333 aa->oa_upcall = upcall;
3334 aa->oa_cookie = cookie;
3336 aa->oa_lockh = lockh;
3338 req->rq_interpret_reply =
3339 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3340 if (rqset == PTLRPCD_SET)
3341 ptlrpcd_add_req(req, PSCOPE_OTHER);
3343 ptlrpc_set_add_req(rqset, req);
3344 } else if (intent) {
3345 ptlrpc_req_finished(req);
3350 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3352 ptlrpc_req_finished(req);
3357 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3358 struct ldlm_enqueue_info *einfo,
3359 struct ptlrpc_request_set *rqset)
3361 struct ldlm_res_id res_id;
3365 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3366 oinfo->oi_md->lsm_object_gr, &res_id);
3368 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3369 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3370 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3371 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3372 rqset, rqset != NULL);
3376 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3377 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3378 int *flags, void *data, struct lustre_handle *lockh,
3381 struct obd_device *obd = exp->exp_obd;
3382 int lflags = *flags;
3386 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3389 /* Filesystem lock extents are extended to page boundaries so that
3390 * dealing with the page cache is a little smoother */
3391 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3392 policy->l_extent.end |= ~CFS_PAGE_MASK;
3394 /* Next, search for already existing extent locks that will cover us */
3395 /* If we're trying to read, we also search for an existing PW lock. The
3396 * VFS and page cache already protect us locally, so lots of readers/
3397 * writers can share a single PW lock. */
3401 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3402 res_id, type, policy, rc, lockh, unref);
3405 osc_set_data_with_check(lockh, data, lflags);
3406 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3407 ldlm_lock_addref(lockh, LCK_PR);
3408 ldlm_lock_decref(lockh, LCK_PW);
3415 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3419 if (unlikely(mode == LCK_GROUP))
3420 ldlm_lock_decref_and_cancel(lockh, mode);
3422 ldlm_lock_decref(lockh, mode);
3427 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3428 __u32 mode, struct lustre_handle *lockh)
3431 RETURN(osc_cancel_base(lockh, mode));
3434 static int osc_cancel_unused(struct obd_export *exp,
3435 struct lov_stripe_md *lsm, int flags,
3438 struct obd_device *obd = class_exp2obd(exp);
3439 struct ldlm_res_id res_id, *resp = NULL;
3442 resp = osc_build_res_name(lsm->lsm_object_id,
3443 lsm->lsm_object_gr, &res_id);
3446 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3449 static int osc_statfs_interpret(const struct lu_env *env,
3450 struct ptlrpc_request *req,
3451 struct osc_async_args *aa, int rc)
3453 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3454 struct obd_statfs *msfs;
3459 /* The request has in fact never been sent
3460 * due to issues at a higher level (LOV).
3461 * Exit immediately since the caller is
3462 * aware of the problem and takes care
3463 * of the clean up */
3466 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3467 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3473 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3475 GOTO(out, rc = -EPROTO);
3478 /* Reinitialize the RDONLY and DEGRADED flags at the client
3479 * on each statfs, so they don't stay set permanently. */
3480 spin_lock(&cli->cl_oscc.oscc_lock);
3482 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3483 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3484 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3485 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3487 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3488 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3489 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3490 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3492 /* Add a bit of hysteresis so this flag isn't continually flapping,
3493 * and ensure that new files don't get extremely fragmented due to
3494 * only a small amount of available space in the filesystem.
3495 * We want to set the NOSPC flag when there is less than ~0.1% free
3496 * and clear it when there is at least ~0.2% free space, so:
3497 * avail < ~0.1% max max = avail + used
3498 * 1025 * avail < avail + used used = blocks - free
3499 * 1024 * avail < used
3500 * 1024 * avail < blocks - free
3501 * avail < ((blocks - free) >> 10)
3503 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3504 * lose that amount of space so in those cases we report no space left
3505 * if their is less than 1 GB left. */
3506 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3507 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3508 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3509 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3510 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3511 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3512 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3514 spin_unlock(&cli->cl_oscc.oscc_lock);
3516 *aa->aa_oi->oi_osfs = *msfs;
3518 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3522 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3523 __u64 max_age, struct ptlrpc_request_set *rqset)
3525 struct ptlrpc_request *req;
3526 struct osc_async_args *aa;
3530 /* We could possibly pass max_age in the request (as an absolute
3531 * timestamp or a "seconds.usec ago") so the target can avoid doing
3532 * extra calls into the filesystem if that isn't necessary (e.g.
3533 * during mount that would help a bit). Having relative timestamps
3534 * is not so great if request processing is slow, while absolute
3535 * timestamps are not ideal because they need time synchronization. */
3536 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3540 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3542 ptlrpc_request_free(req);
3545 ptlrpc_request_set_replen(req);
3546 req->rq_request_portal = OST_CREATE_PORTAL;
3547 ptlrpc_at_set_req_timeout(req);
3549 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3550 /* procfs requests not want stat in wait for avoid deadlock */
3551 req->rq_no_resend = 1;
3552 req->rq_no_delay = 1;
3555 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3556 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3557 aa = ptlrpc_req_async_args(req);
3560 ptlrpc_set_add_req(rqset, req);
3564 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3565 __u64 max_age, __u32 flags)
3567 struct obd_statfs *msfs;
3568 struct ptlrpc_request *req;
3569 struct obd_import *imp = NULL;
3573 /*Since the request might also come from lprocfs, so we need
3574 *sync this with client_disconnect_export Bug15684*/
3575 down_read(&obd->u.cli.cl_sem);
3576 if (obd->u.cli.cl_import)
3577 imp = class_import_get(obd->u.cli.cl_import);
3578 up_read(&obd->u.cli.cl_sem);
3582 /* We could possibly pass max_age in the request (as an absolute
3583 * timestamp or a "seconds.usec ago") so the target can avoid doing
3584 * extra calls into the filesystem if that isn't necessary (e.g.
3585 * during mount that would help a bit). Having relative timestamps
3586 * is not so great if request processing is slow, while absolute
3587 * timestamps are not ideal because they need time synchronization. */
3588 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3590 class_import_put(imp);
3595 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3597 ptlrpc_request_free(req);
3600 ptlrpc_request_set_replen(req);
3601 req->rq_request_portal = OST_CREATE_PORTAL;
3602 ptlrpc_at_set_req_timeout(req);
3604 if (flags & OBD_STATFS_NODELAY) {
3605 /* procfs requests not want stat in wait for avoid deadlock */
3606 req->rq_no_resend = 1;
3607 req->rq_no_delay = 1;
3610 rc = ptlrpc_queue_wait(req);
3614 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3616 GOTO(out, rc = -EPROTO);
3623 ptlrpc_req_finished(req);
3627 /* Retrieve object striping information.
3629 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3630 * the maximum number of OST indices which will fit in the user buffer.
3631 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3633 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3635 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3636 struct lov_user_md_v3 lum, *lumk;
3637 struct lov_user_ost_data_v1 *lmm_objects;
3638 int rc = 0, lum_size;
3644 /* we only need the header part from user space to get lmm_magic and
3645 * lmm_stripe_count, (the header part is common to v1 and v3) */
3646 lum_size = sizeof(struct lov_user_md_v1);
3647 if (copy_from_user(&lum, lump, lum_size))
3650 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3651 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3654 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3655 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3656 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3657 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3659 /* we can use lov_mds_md_size() to compute lum_size
3660 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3661 if (lum.lmm_stripe_count > 0) {
3662 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3663 OBD_ALLOC(lumk, lum_size);
3667 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3668 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3670 lmm_objects = &(lumk->lmm_objects[0]);
3671 lmm_objects->l_object_id = lsm->lsm_object_id;
3673 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3677 lumk->lmm_object_id = lsm->lsm_object_id;
3678 lumk->lmm_object_gr = lsm->lsm_object_gr;
3679 lumk->lmm_stripe_count = 1;
3681 if (copy_to_user(lump, lumk, lum_size))
3685 OBD_FREE(lumk, lum_size);
3691 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3692 void *karg, void *uarg)
3694 struct obd_device *obd = exp->exp_obd;
3695 struct obd_ioctl_data *data = karg;
3699 if (!try_module_get(THIS_MODULE)) {
3700 CERROR("Can't get module. Is it alive?");
3704 case OBD_IOC_LOV_GET_CONFIG: {
3706 struct lov_desc *desc;
3707 struct obd_uuid uuid;
3711 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3712 GOTO(out, err = -EINVAL);
3714 data = (struct obd_ioctl_data *)buf;
3716 if (sizeof(*desc) > data->ioc_inllen1) {
3717 obd_ioctl_freedata(buf, len);
3718 GOTO(out, err = -EINVAL);
3721 if (data->ioc_inllen2 < sizeof(uuid)) {
3722 obd_ioctl_freedata(buf, len);
3723 GOTO(out, err = -EINVAL);
3726 desc = (struct lov_desc *)data->ioc_inlbuf1;
3727 desc->ld_tgt_count = 1;
3728 desc->ld_active_tgt_count = 1;
3729 desc->ld_default_stripe_count = 1;
3730 desc->ld_default_stripe_size = 0;
3731 desc->ld_default_stripe_offset = 0;
3732 desc->ld_pattern = 0;
3733 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3735 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3737 err = copy_to_user((void *)uarg, buf, len);
3740 obd_ioctl_freedata(buf, len);
3743 case LL_IOC_LOV_SETSTRIPE:
3744 err = obd_alloc_memmd(exp, karg);
3748 case LL_IOC_LOV_GETSTRIPE:
3749 err = osc_getstripe(karg, uarg);
3751 case OBD_IOC_CLIENT_RECOVER:
3752 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3757 case IOC_OSC_SET_ACTIVE:
3758 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3761 case OBD_IOC_POLL_QUOTACHECK:
3762 err = lquota_poll_check(quota_interface, exp,
3763 (struct if_quotacheck *)karg);
3765 case OBD_IOC_PING_TARGET:
3766 err = ptlrpc_obd_ping(obd);
3769 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3770 cmd, cfs_curproc_comm());
3771 GOTO(out, err = -ENOTTY);
3774 module_put(THIS_MODULE);
3778 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3779 void *key, __u32 *vallen, void *val,
3780 struct lov_stripe_md *lsm)
3783 if (!vallen || !val)
3786 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3787 __u32 *stripe = val;
3788 *vallen = sizeof(*stripe);
3791 } else if (KEY_IS(KEY_LAST_ID)) {
3792 struct ptlrpc_request *req;
3797 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3798 &RQF_OST_GET_INFO_LAST_ID);
3802 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3803 RCL_CLIENT, keylen);
3804 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3806 ptlrpc_request_free(req);
3810 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3811 memcpy(tmp, key, keylen);
3813 req->rq_no_delay = req->rq_no_resend = 1;
3814 ptlrpc_request_set_replen(req);
3815 rc = ptlrpc_queue_wait(req);
3819 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3821 GOTO(out, rc = -EPROTO);
3823 *((obd_id *)val) = *reply;
3825 ptlrpc_req_finished(req);
3827 } else if (KEY_IS(KEY_FIEMAP)) {
3828 struct ptlrpc_request *req;
3829 struct ll_user_fiemap *reply;
3833 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3834 &RQF_OST_GET_INFO_FIEMAP);
3838 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3839 RCL_CLIENT, keylen);
3840 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3841 RCL_CLIENT, *vallen);
3842 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3843 RCL_SERVER, *vallen);
3845 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3847 ptlrpc_request_free(req);
3851 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3852 memcpy(tmp, key, keylen);
3853 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3854 memcpy(tmp, val, *vallen);
3856 ptlrpc_request_set_replen(req);
3857 rc = ptlrpc_queue_wait(req);
3861 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3863 GOTO(out1, rc = -EPROTO);
3865 memcpy(val, reply, *vallen);
3867 ptlrpc_req_finished(req);
3875 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3877 struct llog_ctxt *ctxt;
3881 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3883 rc = llog_initiator_connect(ctxt);
3884 llog_ctxt_put(ctxt);
3886 /* XXX return an error? skip setting below flags? */
3889 spin_lock(&imp->imp_lock);
3890 imp->imp_server_timeout = 1;
3891 imp->imp_pingable = 1;
3892 spin_unlock(&imp->imp_lock);
3893 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3898 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3899 struct ptlrpc_request *req,
3906 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3909 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3910 void *key, obd_count vallen, void *val,
3911 struct ptlrpc_request_set *set)
3913 struct ptlrpc_request *req;
3914 struct obd_device *obd = exp->exp_obd;
3915 struct obd_import *imp = class_exp2cliimp(exp);
3920 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3922 if (KEY_IS(KEY_NEXT_ID)) {
3924 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3926 if (vallen != sizeof(obd_id))
3931 if (vallen != sizeof(obd_id))
3934 /* avoid race between allocate new object and set next id
3935 * from ll_sync thread */
3936 spin_lock(&oscc->oscc_lock);
3937 new_val = *((obd_id*)val) + 1;
3938 if (new_val > oscc->oscc_next_id)
3939 oscc->oscc_next_id = new_val;
3940 spin_unlock(&oscc->oscc_lock);
3941 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3942 exp->exp_obd->obd_name,
3943 obd->u.cli.cl_oscc.oscc_next_id);
3948 if (KEY_IS(KEY_INIT_RECOV)) {
3949 if (vallen != sizeof(int))
3951 spin_lock(&imp->imp_lock);
3952 imp->imp_initial_recov = *(int *)val;
3953 spin_unlock(&imp->imp_lock);
3954 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3955 exp->exp_obd->obd_name,
3956 imp->imp_initial_recov);
3960 if (KEY_IS(KEY_CHECKSUM)) {
3961 if (vallen != sizeof(int))
3963 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3967 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3968 sptlrpc_conf_client_adapt(obd);
3972 if (KEY_IS(KEY_FLUSH_CTX)) {
3973 sptlrpc_import_flush_my_ctx(imp);
3977 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3980 /* We pass all other commands directly to OST. Since nobody calls osc
3981 methods directly and everybody is supposed to go through LOV, we
3982 assume lov checked invalid values for us.
3983 The only recognised values so far are evict_by_nid and mds_conn.
3984 Even if something bad goes through, we'd get a -EINVAL from OST
3987 if (KEY_IS(KEY_GRANT_SHRINK))
3988 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3990 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3995 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3996 RCL_CLIENT, keylen);
3997 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3998 RCL_CLIENT, vallen);
3999 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4001 ptlrpc_request_free(req);
4005 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4006 memcpy(tmp, key, keylen);
4007 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4008 memcpy(tmp, val, vallen);
4010 if (KEY_IS(KEY_MDS_CONN)) {
4011 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4013 oscc->oscc_oa.o_gr = (*(__u32 *)val);
4014 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4015 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
4016 req->rq_no_delay = req->rq_no_resend = 1;
4017 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4018 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4019 struct osc_grant_args *aa;
4022 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4023 aa = ptlrpc_req_async_args(req);
4026 ptlrpc_req_finished(req);
4029 *oa = ((struct ost_body *)val)->oa;
4031 req->rq_interpret_reply = osc_shrink_grant_interpret;
4034 ptlrpc_request_set_replen(req);
4035 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4036 LASSERT(set != NULL);
4037 ptlrpc_set_add_req(set, req);
4038 ptlrpc_check_set(NULL, set);
4040 ptlrpcd_add_req(req, PSCOPE_OTHER);
4046 static struct llog_operations osc_size_repl_logops = {
4047 lop_cancel: llog_obd_repl_cancel
4050 static struct llog_operations osc_mds_ost_orig_logops;
4052 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4053 struct obd_device *tgt, struct llog_catid *catid)
4058 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4059 &catid->lci_logid, &osc_mds_ost_orig_logops);
4061 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4065 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4066 NULL, &osc_size_repl_logops);
4068 struct llog_ctxt *ctxt =
4069 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4072 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4077 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4078 obd->obd_name, tgt->obd_name, catid, rc);
4079 CERROR("logid "LPX64":0x%x\n",
4080 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4085 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4086 struct obd_device *disk_obd, int *index)
4088 struct llog_catid catid;
4089 static char name[32] = CATLIST;
4093 LASSERT(olg == &obd->obd_olg);
4095 mutex_down(&olg->olg_cat_processing);
4096 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4098 CERROR("rc: %d\n", rc);
4102 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4103 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4104 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4106 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4108 CERROR("rc: %d\n", rc);
4112 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4114 CERROR("rc: %d\n", rc);
4119 mutex_up(&olg->olg_cat_processing);
4124 static int osc_llog_finish(struct obd_device *obd, int count)
4126 struct llog_ctxt *ctxt;
4127 int rc = 0, rc2 = 0;
4130 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4132 rc = llog_cleanup(ctxt);
4134 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4136 rc2 = llog_cleanup(ctxt);
4143 static int osc_reconnect(const struct lu_env *env,
4144 struct obd_export *exp, struct obd_device *obd,
4145 struct obd_uuid *cluuid,
4146 struct obd_connect_data *data,
4149 struct client_obd *cli = &obd->u.cli;
4151 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4154 client_obd_list_lock(&cli->cl_loi_list_lock);
4155 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4156 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4157 lost_grant = cli->cl_lost_grant;
4158 cli->cl_lost_grant = 0;
4159 client_obd_list_unlock(&cli->cl_loi_list_lock);
4161 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4162 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4163 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4164 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4165 " ocd_grant: %d\n", data->ocd_connect_flags,
4166 data->ocd_version, data->ocd_grant);
4172 static int osc_disconnect(struct obd_export *exp)
4174 struct obd_device *obd = class_exp2obd(exp);
4175 struct llog_ctxt *ctxt;
4178 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4180 if (obd->u.cli.cl_conn_count == 1) {
4181 /* Flush any remaining cancel messages out to the
4183 llog_sync(ctxt, exp);
4185 llog_ctxt_put(ctxt);
4187 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4191 rc = client_disconnect_export(exp);
4193 * Initially we put del_shrink_grant before disconnect_export, but it
4194 * causes the following problem if setup (connect) and cleanup
4195 * (disconnect) are tangled together.
4196 * connect p1 disconnect p2
4197 * ptlrpc_connect_import
4198 * ............... class_manual_cleanup
4201 * ptlrpc_connect_interrupt
4203 * add this client to shrink list
4205 * Bang! pinger trigger the shrink.
4206 * So the osc should be disconnected from the shrink list, after we
4207 * are sure the import has been destroyed. BUG18662
4209 if (obd->u.cli.cl_import == NULL)
4210 osc_del_shrink_grant(&obd->u.cli);
4214 static int osc_import_event(struct obd_device *obd,
4215 struct obd_import *imp,
4216 enum obd_import_event event)
4218 struct client_obd *cli;
4222 LASSERT(imp->imp_obd == obd);
4225 case IMP_EVENT_DISCON: {
4226 /* Only do this on the MDS OSC's */
4227 if (imp->imp_server_timeout) {
4228 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4230 spin_lock(&oscc->oscc_lock);
4231 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4232 spin_unlock(&oscc->oscc_lock);
4235 client_obd_list_lock(&cli->cl_loi_list_lock);
4236 cli->cl_avail_grant = 0;
4237 cli->cl_lost_grant = 0;
4238 client_obd_list_unlock(&cli->cl_loi_list_lock);
4241 case IMP_EVENT_INACTIVE: {
4242 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4245 case IMP_EVENT_INVALIDATE: {
4246 struct ldlm_namespace *ns = obd->obd_namespace;
4250 env = cl_env_get(&refcheck);
4254 client_obd_list_lock(&cli->cl_loi_list_lock);
4255 /* all pages go to failing rpcs due to the invalid
4257 osc_check_rpcs(env, cli);
4258 client_obd_list_unlock(&cli->cl_loi_list_lock);
4260 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4261 cl_env_put(env, &refcheck);
4266 case IMP_EVENT_ACTIVE: {
4267 /* Only do this on the MDS OSC's */
4268 if (imp->imp_server_timeout) {
4269 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4271 spin_lock(&oscc->oscc_lock);
4272 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4273 spin_unlock(&oscc->oscc_lock);
4275 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4278 case IMP_EVENT_OCD: {
4279 struct obd_connect_data *ocd = &imp->imp_connect_data;
4281 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4282 osc_init_grant(&obd->u.cli, ocd);
4285 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4286 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4288 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4292 CERROR("Unknown import event %d\n", event);
4298 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4304 rc = ptlrpcd_addref();
4308 rc = client_obd_setup(obd, lcfg);
4312 struct lprocfs_static_vars lvars = { 0 };
4313 struct client_obd *cli = &obd->u.cli;
4315 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4316 lprocfs_osc_init_vars(&lvars);
4317 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4318 lproc_osc_attach_seqstat(obd);
4319 sptlrpc_lprocfs_cliobd_attach(obd);
4320 ptlrpc_lprocfs_register_obd(obd);
4324 /* We need to allocate a few requests more, because
4325 brw_interpret tries to create new requests before freeing
4326 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4327 reserved, but I afraid that might be too much wasted RAM
4328 in fact, so 2 is just my guess and still should work. */
4329 cli->cl_import->imp_rq_pool =
4330 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4332 ptlrpc_add_rqs_to_pool);
4334 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4335 sema_init(&cli->cl_grant_sem, 1);
4341 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4347 case OBD_CLEANUP_EARLY: {
4348 struct obd_import *imp;
4349 imp = obd->u.cli.cl_import;
4350 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4351 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4352 ptlrpc_deactivate_import(imp);
4353 spin_lock(&imp->imp_lock);
4354 imp->imp_pingable = 0;
4355 spin_unlock(&imp->imp_lock);
4358 case OBD_CLEANUP_EXPORTS: {
4359 /* If we set up but never connected, the
4360 client import will not have been cleaned. */
4361 if (obd->u.cli.cl_import) {
4362 struct obd_import *imp;
4363 down_write(&obd->u.cli.cl_sem);
4364 imp = obd->u.cli.cl_import;
4365 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4367 ptlrpc_invalidate_import(imp);
4368 if (imp->imp_rq_pool) {
4369 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4370 imp->imp_rq_pool = NULL;
4372 class_destroy_import(imp);
4373 up_write(&obd->u.cli.cl_sem);
4374 obd->u.cli.cl_import = NULL;
4376 rc = obd_llog_finish(obd, 0);
4378 CERROR("failed to cleanup llogging subsystems\n");
4385 int osc_cleanup(struct obd_device *obd)
4390 ptlrpc_lprocfs_unregister_obd(obd);
4391 lprocfs_obd_cleanup(obd);
4393 /* free memory of osc quota cache */
4394 lquota_cleanup(quota_interface, obd);
4396 rc = client_obd_cleanup(obd);
4402 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4404 struct lprocfs_static_vars lvars = { 0 };
4407 lprocfs_osc_init_vars(&lvars);
4409 switch (lcfg->lcfg_command) {
4411 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4421 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4423 return osc_process_config_base(obd, buf);
4426 struct obd_ops osc_obd_ops = {
4427 .o_owner = THIS_MODULE,
4428 .o_setup = osc_setup,
4429 .o_precleanup = osc_precleanup,
4430 .o_cleanup = osc_cleanup,
4431 .o_add_conn = client_import_add_conn,
4432 .o_del_conn = client_import_del_conn,
4433 .o_connect = client_connect_import,
4434 .o_reconnect = osc_reconnect,
4435 .o_disconnect = osc_disconnect,
4436 .o_statfs = osc_statfs,
4437 .o_statfs_async = osc_statfs_async,
4438 .o_packmd = osc_packmd,
4439 .o_unpackmd = osc_unpackmd,
4440 .o_precreate = osc_precreate,
4441 .o_create = osc_create,
4442 .o_create_async = osc_create_async,
4443 .o_destroy = osc_destroy,
4444 .o_getattr = osc_getattr,
4445 .o_getattr_async = osc_getattr_async,
4446 .o_setattr = osc_setattr,
4447 .o_setattr_async = osc_setattr_async,
4449 .o_punch = osc_punch,
4451 .o_enqueue = osc_enqueue,
4452 .o_change_cbdata = osc_change_cbdata,
4453 .o_cancel = osc_cancel,
4454 .o_cancel_unused = osc_cancel_unused,
4455 .o_iocontrol = osc_iocontrol,
4456 .o_get_info = osc_get_info,
4457 .o_set_info_async = osc_set_info_async,
4458 .o_import_event = osc_import_event,
4459 .o_llog_init = osc_llog_init,
4460 .o_llog_finish = osc_llog_finish,
4461 .o_process_config = osc_process_config,
4464 extern struct lu_kmem_descr osc_caches[];
4465 extern spinlock_t osc_ast_guard;
4466 extern struct lock_class_key osc_ast_guard_class;
4468 int __init osc_init(void)
4470 struct lprocfs_static_vars lvars = { 0 };
4474 /* print an address of _any_ initialized kernel symbol from this
4475 * module, to allow debugging with gdb that doesn't support data
4476 * symbols from modules.*/
4477 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4479 rc = lu_kmem_init(osc_caches);
4481 lprocfs_osc_init_vars(&lvars);
4483 request_module("lquota");
4484 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4485 lquota_init(quota_interface);
4486 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4488 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4489 LUSTRE_OSC_NAME, &osc_device_type);
4491 if (quota_interface)
4492 PORTAL_SYMBOL_PUT(osc_quota_interface);
4493 lu_kmem_fini(osc_caches);
4497 spin_lock_init(&osc_ast_guard);
4498 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4500 osc_mds_ost_orig_logops = llog_lvfs_ops;
4501 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4502 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4503 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4504 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4510 static void /*__exit*/ osc_exit(void)
4512 lu_device_type_fini(&osc_device_type);
4514 lquota_exit(quota_interface);
4515 if (quota_interface)
4516 PORTAL_SYMBOL_PUT(osc_quota_interface);
4518 class_unregister_type(LUSTRE_OSC_NAME);
4519 lu_kmem_fini(osc_caches);
4522 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4523 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4524 MODULE_LICENSE("GPL");
4526 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);