4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
59 obd_count aa_page_count;
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct obd_capa *aa_ocapa;
66 struct cl_req *aa_clerq;
69 #define osc_grant_args osc_brw_async_args
71 struct osc_async_args {
72 struct obd_info *aa_oi;
75 struct osc_setattr_args {
77 obd_enqueue_update_f sa_upcall;
81 struct osc_fsync_args {
82 struct obd_info *fa_oi;
83 obd_enqueue_update_f fa_upcall;
87 struct osc_enqueue_args {
88 struct obd_export *oa_exp;
92 osc_enqueue_upcall_f oa_upcall;
94 struct ost_lvb *oa_lvb;
95 struct lustre_handle oa_lockh;
96 unsigned int oa_agl:1;
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105 struct lov_mds_md *lmm, int lmm_bytes)
108 struct obd_import *imp = class_exp2cliimp(exp);
112 if (lmm_bytes < sizeof(*lmm)) {
113 CERROR("%s: lov_mds_md too small: %d, need %d\n",
114 exp->exp_obd->obd_name, lmm_bytes,
118 /* XXX LOV_MAGIC etc check? */
120 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
121 CERROR("%s: zero lmm_object_id: rc = %d\n",
122 exp->exp_obd->obd_name, -EINVAL);
127 lsm_size = lov_stripe_md_size(1);
131 if (*lsmp != NULL && lmm == NULL) {
132 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
133 OBD_FREE(*lsmp, lsm_size);
139 OBD_ALLOC(*lsmp, lsm_size);
140 if (unlikely(*lsmp == NULL))
142 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
143 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
144 OBD_FREE(*lsmp, lsm_size);
147 loi_init((*lsmp)->lsm_oinfo[0]);
148 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
153 /* XXX zero *lsmp? */
154 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
157 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
160 (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166 struct ost_body *body, void *capa)
168 struct obd_capa *oc = (struct obd_capa *)capa;
169 struct lustre_capa *c;
174 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
177 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178 DEBUG_CAPA(D_SEC, c, "pack");
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182 struct obd_info *oinfo)
184 struct ost_body *body;
186 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
189 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
191 osc_pack_capa(req, body, oinfo->oi_capa);
194 static inline void osc_set_capa_size(struct ptlrpc_request *req,
195 const struct req_msg_field *field,
199 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
201 /* it is already calculated as sizeof struct obd_capa */
205 static int osc_getattr_interpret(const struct lu_env *env,
206 struct ptlrpc_request *req,
207 struct osc_async_args *aa, int rc)
209 struct ost_body *body;
215 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
218 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
219 aa->aa_oi->oi_oa, &body->oa);
221 /* This should really be sent by the OST */
222 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
223 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
225 CDEBUG(D_INFO, "can't unpack ost_body\n");
227 aa->aa_oi->oi_oa->o_valid = 0;
230 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
234 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
235 struct ptlrpc_request_set *set)
237 struct ptlrpc_request *req;
238 struct osc_async_args *aa;
242 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
246 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
247 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
249 ptlrpc_request_free(req);
253 osc_pack_req_body(req, oinfo);
255 ptlrpc_request_set_replen(req);
256 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
258 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
259 aa = ptlrpc_req_async_args(req);
262 ptlrpc_set_add_req(set, req);
266 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
267 struct obd_info *oinfo)
269 struct ptlrpc_request *req;
270 struct ost_body *body;
274 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
278 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
281 ptlrpc_request_free(req);
285 osc_pack_req_body(req, oinfo);
287 ptlrpc_request_set_replen(req);
289 rc = ptlrpc_queue_wait(req);
293 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
295 GOTO(out, rc = -EPROTO);
297 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
301 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
302 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
306 ptlrpc_req_finished(req);
310 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
311 struct obd_info *oinfo, struct obd_trans_info *oti)
313 struct ptlrpc_request *req;
314 struct ost_body *body;
318 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327 ptlrpc_request_free(req);
331 osc_pack_req_body(req, oinfo);
333 ptlrpc_request_set_replen(req);
335 rc = ptlrpc_queue_wait(req);
339 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EPROTO);
343 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
348 ptlrpc_req_finished(req);
352 static int osc_setattr_interpret(const struct lu_env *env,
353 struct ptlrpc_request *req,
354 struct osc_setattr_args *sa, int rc)
356 struct ost_body *body;
362 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364 GOTO(out, rc = -EPROTO);
366 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
369 rc = sa->sa_upcall(sa->sa_cookie, rc);
373 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
374 struct obd_trans_info *oti,
375 obd_enqueue_update_f upcall, void *cookie,
376 struct ptlrpc_request_set *rqset)
378 struct ptlrpc_request *req;
379 struct osc_setattr_args *sa;
383 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
387 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
388 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
390 ptlrpc_request_free(req);
394 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
395 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
397 osc_pack_req_body(req, oinfo);
399 ptlrpc_request_set_replen(req);
401 /* do mds to ost setattr asynchronously */
403 /* Do not wait for response. */
404 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
406 req->rq_interpret_reply =
407 (ptlrpc_interpterer_t)osc_setattr_interpret;
409 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
410 sa = ptlrpc_req_async_args(req);
411 sa->sa_oa = oinfo->oi_oa;
412 sa->sa_upcall = upcall;
413 sa->sa_cookie = cookie;
415 if (rqset == PTLRPCD_SET)
416 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
418 ptlrpc_set_add_req(rqset, req);
424 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
425 struct obd_trans_info *oti,
426 struct ptlrpc_request_set *rqset)
428 return osc_setattr_async_base(exp, oinfo, oti,
429 oinfo->oi_cb_up, oinfo, rqset);
432 int osc_real_create(struct obd_export *exp, struct obdo *oa,
433 struct lov_stripe_md **ea, struct obd_trans_info *oti)
435 struct ptlrpc_request *req;
436 struct ost_body *body;
437 struct lov_stripe_md *lsm;
446 rc = obd_alloc_memmd(exp, &lsm);
451 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
453 GOTO(out, rc = -ENOMEM);
455 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
457 ptlrpc_request_free(req);
461 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
466 ptlrpc_request_set_replen(req);
468 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469 oa->o_flags == OBD_FL_DELORPHAN) {
471 "delorphan from OST integration");
472 /* Don't resend the delorphan req */
473 req->rq_no_resend = req->rq_no_delay = 1;
476 rc = ptlrpc_queue_wait(req);
480 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
482 GOTO(out_req, rc = -EPROTO);
484 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
485 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
487 oa->o_blksize = cli_brw_size(exp->exp_obd);
488 oa->o_valid |= OBD_MD_FLBLKSZ;
490 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491 * have valid lsm_oinfo data structs, so don't go touching that.
492 * This needs to be fixed in a big way.
494 lsm->lsm_oi = oa->o_oi;
498 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499 if (oti->oti_logcookies == NULL)
500 oti->oti_logcookies = &oti->oti_onecookie;
502 *oti->oti_logcookies = oa->o_lcookie;
506 CDEBUG(D_HA, "transno: "LPD64"\n",
507 lustre_msg_get_transno(req->rq_repmsg));
509 ptlrpc_req_finished(req);
512 obd_free_memmd(exp, &lsm);
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517 obd_enqueue_update_f upcall, void *cookie,
518 struct ptlrpc_request_set *rqset)
520 struct ptlrpc_request *req;
521 struct osc_setattr_args *sa;
522 struct ost_body *body;
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
533 ptlrpc_request_free(req);
536 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537 ptlrpc_at_set_req_timeout(req);
539 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
541 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
543 osc_pack_capa(req, body, oinfo->oi_capa);
545 ptlrpc_request_set_replen(req);
547 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
548 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
549 sa = ptlrpc_req_async_args(req);
550 sa->sa_oa = oinfo->oi_oa;
551 sa->sa_upcall = upcall;
552 sa->sa_cookie = cookie;
553 if (rqset == PTLRPCD_SET)
554 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
556 ptlrpc_set_add_req(rqset, req);
561 static int osc_sync_interpret(const struct lu_env *env,
562 struct ptlrpc_request *req,
565 struct osc_fsync_args *fa = arg;
566 struct ost_body *body;
572 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
574 CERROR ("can't unpack ost_body\n");
575 GOTO(out, rc = -EPROTO);
578 *fa->fa_oi->oi_oa = body->oa;
580 rc = fa->fa_upcall(fa->fa_cookie, rc);
584 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
585 obd_enqueue_update_f upcall, void *cookie,
586 struct ptlrpc_request_set *rqset)
588 struct ptlrpc_request *req;
589 struct ost_body *body;
590 struct osc_fsync_args *fa;
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
599 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 ptlrpc_request_free(req);
605 /* overload the size and blocks fields in the oa with start/end */
606 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
608 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
610 osc_pack_capa(req, body, oinfo->oi_capa);
612 ptlrpc_request_set_replen(req);
613 req->rq_interpret_reply = osc_sync_interpret;
615 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616 fa = ptlrpc_req_async_args(req);
618 fa->fa_upcall = upcall;
619 fa->fa_cookie = cookie;
621 if (rqset == PTLRPCD_SET)
622 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
624 ptlrpc_set_add_req(rqset, req);
629 /* Find and cancel locally locks matched by @mode in the resource found by
630 * @objid. Found locks are added into @cancel list. Returns the amount of
631 * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633 struct list_head *cancels,
634 ldlm_mode_t mode, __u64 lock_flags)
636 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637 struct ldlm_res_id res_id;
638 struct ldlm_resource *res;
642 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643 * export) but disabled through procfs (flag in NS).
645 * This distinguishes from a case when ELC is not supported originally,
646 * when we still want to cancel locks in advance and just cancel them
647 * locally, without sending any RPC. */
648 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
651 ostid_build_res_name(&oa->o_oi, &res_id);
652 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
656 LDLM_RESOURCE_ADDREF(res);
657 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658 lock_flags, 0, NULL);
659 LDLM_RESOURCE_DELREF(res);
660 ldlm_resource_putref(res);
664 static int osc_destroy_interpret(const struct lu_env *env,
665 struct ptlrpc_request *req, void *data,
668 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
670 atomic_dec(&cli->cl_destroy_in_flight);
671 wake_up(&cli->cl_destroy_waitq);
675 static int osc_can_send_destroy(struct client_obd *cli)
677 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678 cli->cl_max_rpcs_in_flight) {
679 /* The destroy request can be sent */
682 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683 cli->cl_max_rpcs_in_flight) {
685 * The counter has been modified between the two atomic
688 wake_up(&cli->cl_destroy_waitq);
693 int osc_create(const struct lu_env *env, struct obd_export *exp,
694 struct obdo *oa, struct lov_stripe_md **ea,
695 struct obd_trans_info *oti)
702 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
704 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
705 RETURN(osc_real_create(exp, oa, ea, oti));
707 /* we should not get here anymore */
713 /* Destroy requests can be async always on the client, and we don't even really
714 * care about the return code since the client cannot do anything at all about
716 * When the MDS is unlinking a filename, it saves the file objects into a
717 * recovery llog, and these object records are cancelled when the OST reports
718 * they were destroyed and sync'd to disk (i.e. transaction committed).
719 * If the client dies, or the OST is down when the object should be destroyed,
720 * the records are not cancelled, and when the OST reconnects to the MDS next,
721 * it will retrieve the llog unlink logs and then sends the log cancellation
722 * cookies to the MDS after committing destroy transactions. */
723 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
724 struct obdo *oa, struct lov_stripe_md *ea,
725 struct obd_trans_info *oti, struct obd_export *md_export,
728 struct client_obd *cli = &exp->exp_obd->u.cli;
729 struct ptlrpc_request *req;
730 struct ost_body *body;
731 struct list_head cancels = LIST_HEAD_INIT(cancels);
736 CDEBUG(D_INFO, "oa NULL\n");
740 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
741 LDLM_FL_DISCARD_DATA);
743 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
745 ldlm_lock_list_put(&cancels, l_bl_ast, count);
749 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
750 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
753 ptlrpc_request_free(req);
757 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
758 ptlrpc_at_set_req_timeout(req);
760 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
761 oa->o_lcookie = *oti->oti_logcookies;
762 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
764 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
766 osc_pack_capa(req, body, (struct obd_capa *)capa);
767 ptlrpc_request_set_replen(req);
769 /* If osc_destory is for destroying the unlink orphan,
770 * sent from MDT to OST, which should not be blocked here,
771 * because the process might be triggered by ptlrpcd, and
772 * it is not good to block ptlrpcd thread (b=16006)*/
773 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
774 req->rq_interpret_reply = osc_destroy_interpret;
775 if (!osc_can_send_destroy(cli)) {
776 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
780 * Wait until the number of on-going destroy RPCs drops
781 * under max_rpc_in_flight
783 l_wait_event_exclusive(cli->cl_destroy_waitq,
784 osc_can_send_destroy(cli), &lwi);
788 /* Do not wait for response */
789 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
793 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
796 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
798 LASSERT(!(oa->o_valid & bits));
801 client_obd_list_lock(&cli->cl_loi_list_lock);
802 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
803 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
804 cli->cl_dirty_max_pages)) {
805 CERROR("dirty %lu - %lu > dirty_max %lu\n",
806 cli->cl_dirty_pages, cli->cl_dirty_transit,
807 cli->cl_dirty_max_pages);
809 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
810 atomic_long_read(&obd_dirty_transit_pages) >
811 (obd_max_dirty_pages + 1))) {
812 /* The atomic_read() allowing the atomic_inc() are
813 * not covered by a lock thus they may safely race and trip
814 * this CERROR() unless we add in a small fudge factor (+1). */
815 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
816 cli->cl_import->imp_obd->obd_name,
817 atomic_long_read(&obd_dirty_pages),
818 atomic_long_read(&obd_dirty_transit_pages),
819 obd_max_dirty_pages);
821 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
823 CERROR("dirty %lu - dirty_max %lu too big???\n",
824 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
827 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
829 (cli->cl_max_rpcs_in_flight + 1);
830 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
833 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
834 oa->o_dropped = cli->cl_lost_grant;
835 cli->cl_lost_grant = 0;
836 client_obd_list_unlock(&cli->cl_loi_list_lock);
837 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
838 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
842 void osc_update_next_shrink(struct client_obd *cli)
844 cli->cl_next_shrink_grant =
845 cfs_time_shift(cli->cl_grant_shrink_interval);
846 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
847 cli->cl_next_shrink_grant);
850 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
852 client_obd_list_lock(&cli->cl_loi_list_lock);
853 cli->cl_avail_grant += grant;
854 client_obd_list_unlock(&cli->cl_loi_list_lock);
857 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
859 if (body->oa.o_valid & OBD_MD_FLGRANT) {
860 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
861 __osc_update_grant(cli, body->oa.o_grant);
865 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
866 obd_count keylen, void *key, obd_count vallen,
867 void *val, struct ptlrpc_request_set *set);
869 static int osc_shrink_grant_interpret(const struct lu_env *env,
870 struct ptlrpc_request *req,
873 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
874 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
875 struct ost_body *body;
878 __osc_update_grant(cli, oa->o_grant);
882 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
884 osc_update_grant(cli, body);
890 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
892 client_obd_list_lock(&cli->cl_loi_list_lock);
893 oa->o_grant = cli->cl_avail_grant / 4;
894 cli->cl_avail_grant -= oa->o_grant;
895 client_obd_list_unlock(&cli->cl_loi_list_lock);
896 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
897 oa->o_valid |= OBD_MD_FLFLAGS;
900 oa->o_flags |= OBD_FL_SHRINK_GRANT;
901 osc_update_next_shrink(cli);
904 /* Shrink the current grant, either from some large amount to enough for a
905 * full set of in-flight RPCs, or if we have already shrunk to that limit
906 * then to enough for a single RPC. This avoids keeping more grant than
907 * needed, and avoids shrinking the grant piecemeal. */
908 static int osc_shrink_grant(struct client_obd *cli)
910 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
911 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
913 client_obd_list_lock(&cli->cl_loi_list_lock);
914 if (cli->cl_avail_grant <= target_bytes)
915 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
916 client_obd_list_unlock(&cli->cl_loi_list_lock);
918 return osc_shrink_grant_to_target(cli, target_bytes);
921 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
924 struct ost_body *body;
927 client_obd_list_lock(&cli->cl_loi_list_lock);
928 /* Don't shrink if we are already above or below the desired limit
929 * We don't want to shrink below a single RPC, as that will negatively
930 * impact block allocation and long-term performance. */
931 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
932 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
934 if (target_bytes >= cli->cl_avail_grant) {
935 client_obd_list_unlock(&cli->cl_loi_list_lock);
938 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 osc_announce_cached(cli, &body->oa, 0);
946 client_obd_list_lock(&cli->cl_loi_list_lock);
947 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
948 cli->cl_avail_grant = target_bytes;
949 client_obd_list_unlock(&cli->cl_loi_list_lock);
950 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
951 body->oa.o_valid |= OBD_MD_FLFLAGS;
952 body->oa.o_flags = 0;
954 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
955 osc_update_next_shrink(cli);
957 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
958 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
959 sizeof(*body), body, NULL);
961 __osc_update_grant(cli, body->oa.o_grant);
966 static int osc_should_shrink_grant(struct client_obd *client)
968 cfs_time_t time = cfs_time_current();
969 cfs_time_t next_shrink = client->cl_next_shrink_grant;
971 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
972 OBD_CONNECT_GRANT_SHRINK) == 0)
975 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
976 /* Get the current RPC size directly, instead of going via:
977 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
978 * Keep comment here so that it can be found by searching. */
979 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
981 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
982 client->cl_avail_grant > brw_size)
985 osc_update_next_shrink(client);
990 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
992 struct client_obd *client;
994 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
995 if (osc_should_shrink_grant(client))
996 osc_shrink_grant(client);
1001 static int osc_add_shrink_grant(struct client_obd *client)
1005 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1007 osc_grant_shrink_grant_cb, NULL,
1008 &client->cl_grant_shrink_list);
1010 CERROR("add grant client %s error %d\n",
1011 client->cl_import->imp_obd->obd_name, rc);
1014 CDEBUG(D_CACHE, "add grant client %s \n",
1015 client->cl_import->imp_obd->obd_name);
1016 osc_update_next_shrink(client);
1020 static int osc_del_shrink_grant(struct client_obd *client)
1022 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1026 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1029 * ocd_grant is the total grant amount we're expect to hold: if we've
1030 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1031 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1034 * race is tolerable here: if we're evicted, but imp_state already
1035 * left EVICTED state, then cl_dirty_pages must be 0 already.
1037 client_obd_list_lock(&cli->cl_loi_list_lock);
1038 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1039 cli->cl_avail_grant = ocd->ocd_grant;
1041 cli->cl_avail_grant = ocd->ocd_grant -
1042 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1044 if (cli->cl_avail_grant < 0) {
1045 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1047 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1048 /* workaround for servers which do not have the patch from
1050 cli->cl_avail_grant = ocd->ocd_grant;
1053 /* determine the appropriate chunk size used by osc_extent. */
1054 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1055 client_obd_list_unlock(&cli->cl_loi_list_lock);
1057 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1058 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1059 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1061 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1062 list_empty(&cli->cl_grant_shrink_list))
1063 osc_add_shrink_grant(cli);
1066 /* We assume that the reason this OSC got a short read is because it read
1067 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1068 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1069 * this stripe never got written at or beyond this stripe offset yet. */
1070 static void handle_short_read(int nob_read, obd_count page_count,
1071 struct brw_page **pga)
1076 /* skip bytes read OK */
1077 while (nob_read > 0) {
1078 LASSERT (page_count > 0);
1080 if (pga[i]->count > nob_read) {
1081 /* EOF inside this page */
1082 ptr = kmap(pga[i]->pg) +
1083 (pga[i]->off & ~CFS_PAGE_MASK);
1084 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1091 nob_read -= pga[i]->count;
1096 /* zero remaining pages */
1097 while (page_count-- > 0) {
1098 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1099 memset(ptr, 0, pga[i]->count);
1105 static int check_write_rcs(struct ptlrpc_request *req,
1106 int requested_nob, int niocount,
1107 obd_count page_count, struct brw_page **pga)
1112 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1113 sizeof(*remote_rcs) *
1115 if (remote_rcs == NULL) {
1116 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1120 /* return error if any niobuf was in error */
1121 for (i = 0; i < niocount; i++) {
1122 if ((int)remote_rcs[i] < 0)
1123 return(remote_rcs[i]);
1125 if (remote_rcs[i] != 0) {
1126 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1127 i, remote_rcs[i], req);
1132 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1133 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1134 req->rq_bulk->bd_nob_transferred, requested_nob);
1141 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1143 if (p1->flag != p2->flag) {
1144 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1145 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1146 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1148 /* warn if we try to combine flags that we don't know to be
1149 * safe to combine */
1150 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1151 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1152 "report this at https://jira.hpdd.intel.com/\n",
1153 p1->flag, p2->flag);
1158 return (p1->off + p1->count == p2->off);
1161 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1162 struct brw_page **pga, int opc,
1163 cksum_type_t cksum_type)
1167 struct cfs_crypto_hash_desc *hdesc;
1168 unsigned int bufsize;
1170 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1172 LASSERT(pg_count > 0);
1174 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1175 if (IS_ERR(hdesc)) {
1176 CERROR("Unable to initialize checksum hash %s\n",
1177 cfs_crypto_hash_name(cfs_alg));
1178 return PTR_ERR(hdesc);
1181 while (nob > 0 && pg_count > 0) {
1182 int count = pga[i]->count > nob ? nob : pga[i]->count;
1184 /* corrupt the data before we compute the checksum, to
1185 * simulate an OST->client data error */
1186 if (i == 0 && opc == OST_READ &&
1187 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1188 unsigned char *ptr = kmap(pga[i]->pg);
1189 int off = pga[i]->off & ~CFS_PAGE_MASK;
1191 memcpy(ptr + off, "bad1", min(4, nob));
1194 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1195 pga[i]->off & ~CFS_PAGE_MASK,
1197 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1198 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1200 nob -= pga[i]->count;
1205 bufsize = sizeof(cksum);
1206 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1208 /* For sending we only compute the wrong checksum instead
1209 * of corrupting the data so it is still correct on a redo */
1210 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1217 struct lov_stripe_md *lsm, obd_count page_count,
1218 struct brw_page **pga,
1219 struct ptlrpc_request **reqp,
1220 struct obd_capa *ocapa, int reserve,
1223 struct ptlrpc_request *req;
1224 struct ptlrpc_bulk_desc *desc;
1225 struct ost_body *body;
1226 struct obd_ioobj *ioobj;
1227 struct niobuf_remote *niobuf;
1228 int niocount, i, requested_nob, opc, rc;
1229 struct osc_brw_async_args *aa;
1230 struct req_capsule *pill;
1231 struct brw_page *pg_prev;
1234 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1235 RETURN(-ENOMEM); /* Recoverable */
1236 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1237 RETURN(-EINVAL); /* Fatal */
1239 if ((cmd & OBD_BRW_WRITE) != 0) {
1241 req = ptlrpc_request_alloc_pool(cli->cl_import,
1242 cli->cl_import->imp_rq_pool,
1243 &RQF_OST_BRW_WRITE);
1246 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1251 for (niocount = i = 1; i < page_count; i++) {
1252 if (!can_merge_pages(pga[i - 1], pga[i]))
1256 pill = &req->rq_pill;
1257 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1259 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1260 niocount * sizeof(*niobuf));
1261 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1263 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1265 ptlrpc_request_free(req);
1268 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1269 ptlrpc_at_set_req_timeout(req);
1270 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1272 req->rq_no_retry_einprogress = 1;
1274 desc = ptlrpc_prep_bulk_imp(req, page_count,
1275 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1276 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1280 GOTO(out, rc = -ENOMEM);
1281 /* NB request now owns desc and will free it when it gets freed */
1283 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1284 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1285 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1286 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1288 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1290 obdo_to_ioobj(oa, ioobj);
1291 ioobj->ioo_bufcnt = niocount;
1292 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1293 * that might be send for this request. The actual number is decided
1294 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1295 * "max - 1" for old client compatibility sending "0", and also so the
1296 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1297 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1298 osc_pack_capa(req, body, ocapa);
1299 LASSERT(page_count > 0);
1301 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1302 struct brw_page *pg = pga[i];
1303 int poff = pg->off & ~CFS_PAGE_MASK;
1305 LASSERT(pg->count > 0);
1306 /* make sure there is no gap in the middle of page array */
1307 LASSERTF(page_count == 1 ||
1308 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1309 ergo(i > 0 && i < page_count - 1,
1310 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1311 ergo(i == page_count - 1, poff == 0)),
1312 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1313 i, page_count, pg, pg->off, pg->count);
1314 LASSERTF(i == 0 || pg->off > pg_prev->off,
1315 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1316 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1318 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1319 pg_prev->pg, page_private(pg_prev->pg),
1320 pg_prev->pg->index, pg_prev->off);
1321 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1322 (pg->flag & OBD_BRW_SRVLOCK));
1324 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1325 requested_nob += pg->count;
1327 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1329 niobuf->rnb_len += pg->count;
1331 niobuf->rnb_offset = pg->off;
1332 niobuf->rnb_len = pg->count;
1333 niobuf->rnb_flags = pg->flag;
1338 LASSERTF((void *)(niobuf - niocount) ==
1339 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1340 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1341 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1343 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1345 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1346 body->oa.o_valid |= OBD_MD_FLFLAGS;
1347 body->oa.o_flags = 0;
1349 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1352 if (osc_should_shrink_grant(cli))
1353 osc_shrink_grant_local(cli, &body->oa);
1355 /* size[REQ_REC_OFF] still sizeof (*body) */
1356 if (opc == OST_WRITE) {
1357 if (cli->cl_checksum &&
1358 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1359 /* store cl_cksum_type in a local variable since
1360 * it can be changed via lprocfs */
1361 cksum_type_t cksum_type = cli->cl_cksum_type;
1363 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1364 oa->o_flags &= OBD_FL_LOCAL_MASK;
1365 body->oa.o_flags = 0;
1367 body->oa.o_flags |= cksum_type_pack(cksum_type);
1368 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1369 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1373 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1375 /* save this in 'oa', too, for later checking */
1376 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1377 oa->o_flags |= cksum_type_pack(cksum_type);
1379 /* clear out the checksum flag, in case this is a
1380 * resend but cl_checksum is no longer set. b=11238 */
1381 oa->o_valid &= ~OBD_MD_FLCKSUM;
1383 oa->o_cksum = body->oa.o_cksum;
1384 /* 1 RC per niobuf */
1385 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1386 sizeof(__u32) * niocount);
1388 if (cli->cl_checksum &&
1389 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1390 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1391 body->oa.o_flags = 0;
1392 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1393 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1396 ptlrpc_request_set_replen(req);
1398 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1399 aa = ptlrpc_req_async_args(req);
1401 aa->aa_requested_nob = requested_nob;
1402 aa->aa_nio_count = niocount;
1403 aa->aa_page_count = page_count;
1407 INIT_LIST_HEAD(&aa->aa_oaps);
1408 if (ocapa && reserve)
1409 aa->aa_ocapa = capa_get(ocapa);
1412 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1413 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1414 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1415 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1419 ptlrpc_req_finished(req);
1423 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1424 __u32 client_cksum, __u32 server_cksum, int nob,
1425 obd_count page_count, struct brw_page **pga,
1426 cksum_type_t client_cksum_type)
1430 cksum_type_t cksum_type;
1432 if (server_cksum == client_cksum) {
1433 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1437 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1439 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1442 if (cksum_type != client_cksum_type)
1443 msg = "the server did not use the checksum type specified in "
1444 "the original request - likely a protocol problem";
1445 else if (new_cksum == server_cksum)
1446 msg = "changed on the client after we checksummed it - "
1447 "likely false positive due to mmap IO (bug 11742)";
1448 else if (new_cksum == client_cksum)
1449 msg = "changed in transit before arrival at OST";
1451 msg = "changed in transit AND doesn't match the original - "
1452 "likely false positive due to mmap IO (bug 11742)";
1454 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1455 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1456 msg, libcfs_nid2str(peer->nid),
1457 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1458 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1459 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1460 POSTID(&oa->o_oi), pga[0]->off,
1461 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1462 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1463 "client csum now %x\n", client_cksum, client_cksum_type,
1464 server_cksum, cksum_type, new_cksum);
1468 /* Note rc enters this function as number of bytes transferred */
1469 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1471 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1472 const lnet_process_id_t *peer =
1473 &req->rq_import->imp_connection->c_peer;
1474 struct client_obd *cli = aa->aa_cli;
1475 struct ost_body *body;
1476 __u32 client_cksum = 0;
1479 if (rc < 0 && rc != -EDQUOT) {
1480 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1484 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1485 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1487 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1491 /* set/clear over quota flag for a uid/gid */
1492 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1493 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1494 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1496 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1497 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1499 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1502 osc_update_grant(cli, body);
1507 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1508 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1510 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1512 CERROR("Unexpected +ve rc %d\n", rc);
1515 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1517 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1520 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1521 check_write_checksum(&body->oa, peer, client_cksum,
1522 body->oa.o_cksum, aa->aa_requested_nob,
1523 aa->aa_page_count, aa->aa_ppga,
1524 cksum_type_unpack(aa->aa_oa->o_flags)))
1527 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1528 aa->aa_page_count, aa->aa_ppga);
1532 /* The rest of this function executes only for OST_READs */
1534 /* if unwrap_bulk failed, return -EAGAIN to retry */
1535 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1537 GOTO(out, rc = -EAGAIN);
1539 if (rc > aa->aa_requested_nob) {
1540 CERROR("Unexpected rc %d (%d requested)\n", rc,
1541 aa->aa_requested_nob);
1545 if (rc != req->rq_bulk->bd_nob_transferred) {
1546 CERROR ("Unexpected rc %d (%d transferred)\n",
1547 rc, req->rq_bulk->bd_nob_transferred);
1551 if (rc < aa->aa_requested_nob)
1552 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1554 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1555 static int cksum_counter;
1556 __u32 server_cksum = body->oa.o_cksum;
1559 cksum_type_t cksum_type;
1561 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1562 body->oa.o_flags : 0);
1563 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1564 aa->aa_ppga, OST_READ,
1567 if (peer->nid == req->rq_bulk->bd_sender) {
1571 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1574 if (server_cksum != client_cksum) {
1575 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1576 "%s%s%s inode "DFID" object "DOSTID
1577 " extent ["LPU64"-"LPU64"]\n",
1578 req->rq_import->imp_obd->obd_name,
1579 libcfs_nid2str(peer->nid),
1581 body->oa.o_valid & OBD_MD_FLFID ?
1582 body->oa.o_parent_seq : (__u64)0,
1583 body->oa.o_valid & OBD_MD_FLFID ?
1584 body->oa.o_parent_oid : 0,
1585 body->oa.o_valid & OBD_MD_FLFID ?
1586 body->oa.o_parent_ver : 0,
1587 POSTID(&body->oa.o_oi),
1588 aa->aa_ppga[0]->off,
1589 aa->aa_ppga[aa->aa_page_count-1]->off +
1590 aa->aa_ppga[aa->aa_page_count-1]->count -
1592 CERROR("client %x, server %x, cksum_type %x\n",
1593 client_cksum, server_cksum, cksum_type);
1595 aa->aa_oa->o_cksum = client_cksum;
1599 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1602 } else if (unlikely(client_cksum)) {
1603 static int cksum_missed;
1606 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1607 CERROR("Checksum %u requested from %s but not sent\n",
1608 cksum_missed, libcfs_nid2str(peer->nid));
1614 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1615 aa->aa_oa, &body->oa);
1620 static int osc_brw_redo_request(struct ptlrpc_request *request,
1621 struct osc_brw_async_args *aa, int rc)
1623 struct ptlrpc_request *new_req;
1624 struct osc_brw_async_args *new_aa;
1625 struct osc_async_page *oap;
1628 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1629 "redo for recoverable error %d", rc);
1631 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1632 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1633 aa->aa_cli, aa->aa_oa,
1634 NULL /* lsm unused by osc currently */,
1635 aa->aa_page_count, aa->aa_ppga,
1636 &new_req, aa->aa_ocapa, 0, 1);
1640 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1641 if (oap->oap_request != NULL) {
1642 LASSERTF(request == oap->oap_request,
1643 "request %p != oap_request %p\n",
1644 request, oap->oap_request);
1645 if (oap->oap_interrupted) {
1646 ptlrpc_req_finished(new_req);
1651 /* New request takes over pga and oaps from old request.
1652 * Note that copying a list_head doesn't work, need to move it... */
1654 new_req->rq_interpret_reply = request->rq_interpret_reply;
1655 new_req->rq_async_args = request->rq_async_args;
1656 new_req->rq_commit_cb = request->rq_commit_cb;
1657 /* cap resend delay to the current request timeout, this is similar to
1658 * what ptlrpc does (see after_reply()) */
1659 if (aa->aa_resends > new_req->rq_timeout)
1660 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1662 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1663 new_req->rq_generation_set = 1;
1664 new_req->rq_import_generation = request->rq_import_generation;
1666 new_aa = ptlrpc_req_async_args(new_req);
1668 INIT_LIST_HEAD(&new_aa->aa_oaps);
1669 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1670 INIT_LIST_HEAD(&new_aa->aa_exts);
1671 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1672 new_aa->aa_resends = aa->aa_resends;
1674 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1675 if (oap->oap_request) {
1676 ptlrpc_req_finished(oap->oap_request);
1677 oap->oap_request = ptlrpc_request_addref(new_req);
1681 new_aa->aa_ocapa = aa->aa_ocapa;
1682 aa->aa_ocapa = NULL;
1684 /* XXX: This code will run into problem if we're going to support
1685 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1686 * and wait for all of them to be finished. We should inherit request
1687 * set from old request. */
1688 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1690 DEBUG_REQ(D_INFO, new_req, "new request");
1695 * ugh, we want disk allocation on the target to happen in offset order. we'll
1696 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1697 * fine for our small page arrays and doesn't require allocation. its an
1698 * insertion sort that swaps elements that are strides apart, shrinking the
1699 * stride down until its '1' and the array is sorted.
1701 static void sort_brw_pages(struct brw_page **array, int num)
1704 struct brw_page *tmp;
1708 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1713 for (i = stride ; i < num ; i++) {
1716 while (j >= stride && array[j - stride]->off > tmp->off) {
1717 array[j] = array[j - stride];
1722 } while (stride > 1);
1725 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1727 LASSERT(ppga != NULL);
1728 OBD_FREE(ppga, sizeof(*ppga) * count);
1731 static int brw_interpret(const struct lu_env *env,
1732 struct ptlrpc_request *req, void *data, int rc)
1734 struct osc_brw_async_args *aa = data;
1735 struct osc_extent *ext;
1736 struct osc_extent *tmp;
1737 struct client_obd *cli = aa->aa_cli;
1740 rc = osc_brw_fini_request(req, rc);
1741 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1742 /* When server return -EINPROGRESS, client should always retry
1743 * regardless of the number of times the bulk was resent already. */
1744 if (osc_recoverable_error(rc)) {
1745 if (req->rq_import_generation !=
1746 req->rq_import->imp_generation) {
1747 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1748 ""DOSTID", rc = %d.\n",
1749 req->rq_import->imp_obd->obd_name,
1750 POSTID(&aa->aa_oa->o_oi), rc);
1751 } else if (rc == -EINPROGRESS ||
1752 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1753 rc = osc_brw_redo_request(req, aa, rc);
1755 CERROR("%s: too many resent retries for object: "
1756 ""LPU64":"LPU64", rc = %d.\n",
1757 req->rq_import->imp_obd->obd_name,
1758 POSTID(&aa->aa_oa->o_oi), rc);
1763 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1768 capa_put(aa->aa_ocapa);
1769 aa->aa_ocapa = NULL;
1773 struct obdo *oa = aa->aa_oa;
1774 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1775 unsigned long valid = 0;
1776 struct cl_object *obj;
1777 struct osc_async_page *last;
1779 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1780 obj = osc2cl(last->oap_obj);
1782 cl_object_attr_lock(obj);
1783 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1784 attr->cat_blocks = oa->o_blocks;
1785 valid |= CAT_BLOCKS;
1787 if (oa->o_valid & OBD_MD_FLMTIME) {
1788 attr->cat_mtime = oa->o_mtime;
1791 if (oa->o_valid & OBD_MD_FLATIME) {
1792 attr->cat_atime = oa->o_atime;
1795 if (oa->o_valid & OBD_MD_FLCTIME) {
1796 attr->cat_ctime = oa->o_ctime;
1800 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1801 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1802 loff_t last_off = last->oap_count + last->oap_obj_off +
1805 /* Change file size if this is an out of quota or
1806 * direct IO write and it extends the file size */
1807 if (loi->loi_lvb.lvb_size < last_off) {
1808 attr->cat_size = last_off;
1811 /* Extend KMS if it's not a lockless write */
1812 if (loi->loi_kms < last_off &&
1813 oap2osc_page(last)->ops_srvlock == 0) {
1814 attr->cat_kms = last_off;
1820 cl_object_attr_set(env, obj, attr, valid);
1821 cl_object_attr_unlock(obj);
1823 OBDO_FREE(aa->aa_oa);
1825 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1826 osc_inc_unstable_pages(req);
1828 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1829 list_del_init(&ext->oe_link);
1830 osc_extent_finish(env, ext, 1, rc);
1832 LASSERT(list_empty(&aa->aa_exts));
1833 LASSERT(list_empty(&aa->aa_oaps));
1835 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1836 req->rq_bulk->bd_nob_transferred);
1837 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1838 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1840 client_obd_list_lock(&cli->cl_loi_list_lock);
1841 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1842 * is called so we know whether to go to sync BRWs or wait for more
1843 * RPCs to complete */
1844 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1845 cli->cl_w_in_flight--;
1847 cli->cl_r_in_flight--;
1848 osc_wake_cache_waiters(cli);
1849 client_obd_list_unlock(&cli->cl_loi_list_lock);
1851 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1855 static void brw_commit(struct ptlrpc_request *req)
1857 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1858 * this called via the rq_commit_cb, I need to ensure
1859 * osc_dec_unstable_pages is still called. Otherwise unstable
1860 * pages may be leaked. */
1861 spin_lock(&req->rq_lock);
1862 if (likely(req->rq_unstable)) {
1863 req->rq_unstable = 0;
1864 spin_unlock(&req->rq_lock);
1866 osc_dec_unstable_pages(req);
1868 req->rq_committed = 1;
1869 spin_unlock(&req->rq_lock);
1874 * Build an RPC by the list of extent @ext_list. The caller must ensure
1875 * that the total pages in this list are NOT over max pages per RPC.
1876 * Extents in the list must be in OES_RPC state.
1878 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1879 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1881 struct ptlrpc_request *req = NULL;
1882 struct osc_extent *ext;
1883 struct brw_page **pga = NULL;
1884 struct osc_brw_async_args *aa = NULL;
1885 struct obdo *oa = NULL;
1886 struct osc_async_page *oap;
1887 struct osc_async_page *tmp;
1888 struct cl_req *clerq = NULL;
1889 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1891 struct cl_req_attr *crattr = NULL;
1892 obd_off starting_offset = OBD_OBJECT_EOF;
1893 obd_off ending_offset = 0;
1897 bool soft_sync = false;
1900 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1903 LASSERT(!list_empty(ext_list));
1905 /* add pages into rpc_list to build BRW rpc */
1906 list_for_each_entry(ext, ext_list, oe_link) {
1907 LASSERT(ext->oe_state == OES_RPC);
1908 mem_tight |= ext->oe_memalloc;
1909 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1911 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1912 if (starting_offset > oap->oap_obj_off)
1913 starting_offset = oap->oap_obj_off;
1915 LASSERT(oap->oap_page_off == 0);
1916 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1917 ending_offset = oap->oap_obj_off +
1920 LASSERT(oap->oap_page_off + oap->oap_count ==
1925 soft_sync = osc_over_unstable_soft_limit(cli);
1927 mpflag = cfs_memory_pressure_get_and_set();
1929 OBD_ALLOC(crattr, sizeof(*crattr));
1931 GOTO(out, rc = -ENOMEM);
1933 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1935 GOTO(out, rc = -ENOMEM);
1939 GOTO(out, rc = -ENOMEM);
1942 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1943 struct cl_page *page = oap2cl_page(oap);
1944 if (clerq == NULL) {
1945 clerq = cl_req_alloc(env, page, crt,
1946 1 /* only 1-object rpcs for now */);
1948 GOTO(out, rc = PTR_ERR(clerq));
1951 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1953 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1954 pga[i] = &oap->oap_brw_page;
1955 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1956 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1957 pga[i]->pg, page_index(oap->oap_page), oap,
1960 cl_req_page_add(env, clerq, page);
1963 /* always get the data for the obdo for the rpc */
1964 LASSERT(clerq != NULL);
1965 crattr->cra_oa = oa;
1966 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1968 rc = cl_req_prep(env, clerq);
1970 CERROR("cl_req_prep failed: %d\n", rc);
1974 sort_brw_pages(pga, page_count);
1975 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1976 pga, &req, crattr->cra_capa, 1, 0);
1978 CERROR("prep_req failed: %d\n", rc);
1982 req->rq_commit_cb = brw_commit;
1983 req->rq_interpret_reply = brw_interpret;
1986 req->rq_memalloc = 1;
1988 /* Need to update the timestamps after the request is built in case
1989 * we race with setattr (locally or in queue at OST). If OST gets
1990 * later setattr before earlier BRW (as determined by the request xid),
1991 * the OST will not use BRW timestamps. Sadly, there is no obvious
1992 * way to do this in a single call. bug 10150 */
1993 cl_req_attr_set(env, clerq, crattr,
1994 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1996 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1998 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1999 aa = ptlrpc_req_async_args(req);
2000 INIT_LIST_HEAD(&aa->aa_oaps);
2001 list_splice_init(&rpc_list, &aa->aa_oaps);
2002 INIT_LIST_HEAD(&aa->aa_exts);
2003 list_splice_init(ext_list, &aa->aa_exts);
2004 aa->aa_clerq = clerq;
2006 /* queued sync pages can be torn down while the pages
2007 * were between the pending list and the rpc */
2009 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2010 /* only one oap gets a request reference */
2013 if (oap->oap_interrupted && !req->rq_intr) {
2014 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2016 ptlrpc_mark_interrupted(req);
2020 tmp->oap_request = ptlrpc_request_addref(req);
2022 client_obd_list_lock(&cli->cl_loi_list_lock);
2023 starting_offset >>= PAGE_CACHE_SHIFT;
2024 if (cmd == OBD_BRW_READ) {
2025 cli->cl_r_in_flight++;
2026 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2027 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2028 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2029 starting_offset + 1);
2031 cli->cl_w_in_flight++;
2032 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2033 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2034 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2035 starting_offset + 1);
2037 client_obd_list_unlock(&cli->cl_loi_list_lock);
2039 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2040 page_count, aa, cli->cl_r_in_flight,
2041 cli->cl_w_in_flight);
2043 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2044 * see which CPU/NUMA node the majority of pages were allocated
2045 * on, and try to assign the async RPC to the CPU core
2046 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2048 * But on the other hand, we expect that multiple ptlrpcd
2049 * threads and the initial write sponsor can run in parallel,
2050 * especially when data checksum is enabled, which is CPU-bound
2051 * operation and single ptlrpcd thread cannot process in time.
2052 * So more ptlrpcd threads sharing BRW load
2053 * (with PDL_POLICY_ROUND) seems better.
2055 ptlrpcd_add_req(req, pol, -1);
2061 cfs_memory_pressure_restore(mpflag);
2063 if (crattr != NULL) {
2064 capa_put(crattr->cra_capa);
2065 OBD_FREE(crattr, sizeof(*crattr));
2069 LASSERT(req == NULL);
2074 OBD_FREE(pga, sizeof(*pga) * page_count);
2075 /* this should happen rarely and is pretty bad, it makes the
2076 * pending list not follow the dirty order */
2077 while (!list_empty(ext_list)) {
2078 ext = list_entry(ext_list->next, struct osc_extent,
2080 list_del_init(&ext->oe_link);
2081 osc_extent_finish(env, ext, 0, rc);
2083 if (clerq && !IS_ERR(clerq))
2084 cl_req_completion(env, clerq, rc);
2089 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2090 struct ldlm_enqueue_info *einfo)
2092 void *data = einfo->ei_cbdata;
2095 LASSERT(lock != NULL);
2096 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2097 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2098 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2099 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2101 lock_res_and_lock(lock);
2103 if (lock->l_ast_data == NULL)
2104 lock->l_ast_data = data;
2105 if (lock->l_ast_data == data)
2108 unlock_res_and_lock(lock);
2113 static int osc_set_data_with_check(struct lustre_handle *lockh,
2114 struct ldlm_enqueue_info *einfo)
2116 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2120 set = osc_set_lock_data_with_check(lock, einfo);
2121 LDLM_LOCK_PUT(lock);
2123 CERROR("lockh %p, data %p - client evicted?\n",
2124 lockh, einfo->ei_cbdata);
2128 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2129 ldlm_iterator_t replace, void *data)
2131 struct ldlm_res_id res_id;
2132 struct obd_device *obd = class_exp2obd(exp);
2134 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2135 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2139 /* find any ldlm lock of the inode in osc
2143 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2144 ldlm_iterator_t replace, void *data)
2146 struct ldlm_res_id res_id;
2147 struct obd_device *obd = class_exp2obd(exp);
2150 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2151 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2152 if (rc == LDLM_ITER_STOP)
2154 if (rc == LDLM_ITER_CONTINUE)
2159 static int osc_enqueue_fini(struct ptlrpc_request *req,
2160 osc_enqueue_upcall_f upcall, void *cookie,
2161 struct lustre_handle *lockh, ldlm_mode_t mode,
2162 __u64 *flags, int agl, int errcode)
2164 bool intent = *flags & LDLM_FL_HAS_INTENT;
2168 /* The request was created before ldlm_cli_enqueue call. */
2169 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2170 struct ldlm_reply *rep;
2172 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2173 LASSERT(rep != NULL);
2175 rep->lock_policy_res1 =
2176 ptlrpc_status_ntoh(rep->lock_policy_res1);
2177 if (rep->lock_policy_res1)
2178 errcode = rep->lock_policy_res1;
2180 *flags |= LDLM_FL_LVB_READY;
2181 } else if (errcode == ELDLM_OK) {
2182 *flags |= LDLM_FL_LVB_READY;
2185 /* Call the update callback. */
2186 rc = (*upcall)(cookie, lockh, errcode);
2188 /* release the reference taken in ldlm_cli_enqueue() */
2189 if (errcode == ELDLM_LOCK_MATCHED)
2191 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2192 ldlm_lock_decref(lockh, mode);
2197 static int osc_enqueue_interpret(const struct lu_env *env,
2198 struct ptlrpc_request *req,
2199 struct osc_enqueue_args *aa, int rc)
2201 struct ldlm_lock *lock;
2202 struct lustre_handle *lockh = &aa->oa_lockh;
2203 ldlm_mode_t mode = aa->oa_mode;
2204 struct ost_lvb *lvb = aa->oa_lvb;
2205 __u32 lvb_len = sizeof(*lvb);
2210 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2212 lock = ldlm_handle2lock(lockh);
2213 LASSERTF(lock != NULL,
2214 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2215 lockh->cookie, req, aa);
2217 /* Take an additional reference so that a blocking AST that
2218 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2219 * to arrive after an upcall has been executed by
2220 * osc_enqueue_fini(). */
2221 ldlm_lock_addref(lockh, mode);
2223 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2224 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2226 /* Let CP AST to grant the lock first. */
2227 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2230 LASSERT(aa->oa_lvb == NULL);
2231 LASSERT(aa->oa_flags == NULL);
2232 aa->oa_flags = &flags;
2235 /* Complete obtaining the lock procedure. */
2236 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2237 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2239 /* Complete osc stuff. */
2240 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2241 aa->oa_flags, aa->oa_agl, rc);
2243 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2245 ldlm_lock_decref(lockh, mode);
2246 LDLM_LOCK_PUT(lock);
2250 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2252 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2253 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2254 * other synchronous requests, however keeping some locks and trying to obtain
2255 * others may take a considerable amount of time in a case of ost failure; and
2256 * when other sync requests do not get released lock from a client, the client
2257 * is evicted from the cluster -- such scenarious make the life difficult, so
2258 * release locks just after they are obtained. */
2259 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2260 __u64 *flags, ldlm_policy_data_t *policy,
2261 struct ost_lvb *lvb, int kms_valid,
2262 osc_enqueue_upcall_f upcall, void *cookie,
2263 struct ldlm_enqueue_info *einfo,
2264 struct ptlrpc_request_set *rqset, int async, int agl)
2266 struct obd_device *obd = exp->exp_obd;
2267 struct lustre_handle lockh = { 0 };
2268 struct ptlrpc_request *req = NULL;
2269 int intent = *flags & LDLM_FL_HAS_INTENT;
2270 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2275 /* Filesystem lock extents are extended to page boundaries so that
2276 * dealing with the page cache is a little smoother. */
2277 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2278 policy->l_extent.end |= ~CFS_PAGE_MASK;
2281 * kms is not valid when either object is completely fresh (so that no
2282 * locks are cached), or object was evicted. In the latter case cached
2283 * lock cannot be used, because it would prime inode state with
2284 * potentially stale LVB.
2289 /* Next, search for already existing extent locks that will cover us */
2290 /* If we're trying to read, we also search for an existing PW lock. The
2291 * VFS and page cache already protect us locally, so lots of readers/
2292 * writers can share a single PW lock.
2294 * There are problems with conversion deadlocks, so instead of
2295 * converting a read lock to a write lock, we'll just enqueue a new
2298 * At some point we should cancel the read lock instead of making them
2299 * send us a blocking callback, but there are problems with canceling
2300 * locks out from other users right now, too. */
2301 mode = einfo->ei_mode;
2302 if (einfo->ei_mode == LCK_PR)
2304 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2305 einfo->ei_type, policy, mode, &lockh, 0);
2307 struct ldlm_lock *matched;
2309 if (*flags & LDLM_FL_TEST_LOCK)
2312 matched = ldlm_handle2lock(&lockh);
2314 /* AGL enqueues DLM locks speculatively. Therefore if
2315 * it already exists a DLM lock, it wll just inform the
2316 * caller to cancel the AGL process for this stripe. */
2317 ldlm_lock_decref(&lockh, mode);
2318 LDLM_LOCK_PUT(matched);
2320 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2321 *flags |= LDLM_FL_LVB_READY;
2323 /* We already have a lock, and it's referenced. */
2324 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2326 ldlm_lock_decref(&lockh, mode);
2327 LDLM_LOCK_PUT(matched);
2330 ldlm_lock_decref(&lockh, mode);
2331 LDLM_LOCK_PUT(matched);
2336 if (*flags & LDLM_FL_TEST_LOCK)
2340 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2341 &RQF_LDLM_ENQUEUE_LVB);
2345 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2347 ptlrpc_request_free(req);
2351 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2353 ptlrpc_request_set_replen(req);
2356 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2357 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2359 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2360 sizeof(*lvb), LVB_T_OST, &lockh, async);
2363 struct osc_enqueue_args *aa;
2364 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2365 aa = ptlrpc_req_async_args(req);
2367 aa->oa_mode = einfo->ei_mode;
2368 aa->oa_type = einfo->ei_type;
2369 lustre_handle_copy(&aa->oa_lockh, &lockh);
2370 aa->oa_upcall = upcall;
2371 aa->oa_cookie = cookie;
2374 aa->oa_flags = flags;
2377 /* AGL is essentially to enqueue an DLM lock
2378 * in advance, so we don't care about the
2379 * result of AGL enqueue. */
2381 aa->oa_flags = NULL;
2384 req->rq_interpret_reply =
2385 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2386 if (rqset == PTLRPCD_SET)
2387 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2389 ptlrpc_set_add_req(rqset, req);
2390 } else if (intent) {
2391 ptlrpc_req_finished(req);
2396 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2399 ptlrpc_req_finished(req);
2404 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2405 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2406 __u64 *flags, void *data, struct lustre_handle *lockh,
2409 struct obd_device *obd = exp->exp_obd;
2410 __u64 lflags = *flags;
2414 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2417 /* Filesystem lock extents are extended to page boundaries so that
2418 * dealing with the page cache is a little smoother */
2419 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2420 policy->l_extent.end |= ~CFS_PAGE_MASK;
2422 /* Next, search for already existing extent locks that will cover us */
2423 /* If we're trying to read, we also search for an existing PW lock. The
2424 * VFS and page cache already protect us locally, so lots of readers/
2425 * writers can share a single PW lock. */
2429 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2430 res_id, type, policy, rc, lockh, unref);
2433 if (!osc_set_data_with_check(lockh, data)) {
2434 if (!(lflags & LDLM_FL_TEST_LOCK))
2435 ldlm_lock_decref(lockh, rc);
2439 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2440 ldlm_lock_addref(lockh, LCK_PR);
2441 ldlm_lock_decref(lockh, LCK_PW);
2448 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2452 if (unlikely(mode == LCK_GROUP))
2453 ldlm_lock_decref_and_cancel(lockh, mode);
2455 ldlm_lock_decref(lockh, mode);
2460 static int osc_statfs_interpret(const struct lu_env *env,
2461 struct ptlrpc_request *req,
2462 struct osc_async_args *aa, int rc)
2464 struct obd_statfs *msfs;
2468 /* The request has in fact never been sent
2469 * due to issues at a higher level (LOV).
2470 * Exit immediately since the caller is
2471 * aware of the problem and takes care
2472 * of the clean up */
2475 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2476 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2482 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2484 GOTO(out, rc = -EPROTO);
2487 *aa->aa_oi->oi_osfs = *msfs;
2489 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2493 static int osc_statfs_async(struct obd_export *exp,
2494 struct obd_info *oinfo, __u64 max_age,
2495 struct ptlrpc_request_set *rqset)
2497 struct obd_device *obd = class_exp2obd(exp);
2498 struct ptlrpc_request *req;
2499 struct osc_async_args *aa;
2503 /* We could possibly pass max_age in the request (as an absolute
2504 * timestamp or a "seconds.usec ago") so the target can avoid doing
2505 * extra calls into the filesystem if that isn't necessary (e.g.
2506 * during mount that would help a bit). Having relative timestamps
2507 * is not so great if request processing is slow, while absolute
2508 * timestamps are not ideal because they need time synchronization. */
2509 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2513 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2515 ptlrpc_request_free(req);
2518 ptlrpc_request_set_replen(req);
2519 req->rq_request_portal = OST_CREATE_PORTAL;
2520 ptlrpc_at_set_req_timeout(req);
2522 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2523 /* procfs requests not want stat in wait for avoid deadlock */
2524 req->rq_no_resend = 1;
2525 req->rq_no_delay = 1;
2528 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2529 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2530 aa = ptlrpc_req_async_args(req);
2533 ptlrpc_set_add_req(rqset, req);
2537 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2538 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2540 struct obd_device *obd = class_exp2obd(exp);
2541 struct obd_statfs *msfs;
2542 struct ptlrpc_request *req;
2543 struct obd_import *imp = NULL;
2547 /*Since the request might also come from lprocfs, so we need
2548 *sync this with client_disconnect_export Bug15684*/
2549 down_read(&obd->u.cli.cl_sem);
2550 if (obd->u.cli.cl_import)
2551 imp = class_import_get(obd->u.cli.cl_import);
2552 up_read(&obd->u.cli.cl_sem);
2556 /* We could possibly pass max_age in the request (as an absolute
2557 * timestamp or a "seconds.usec ago") so the target can avoid doing
2558 * extra calls into the filesystem if that isn't necessary (e.g.
2559 * during mount that would help a bit). Having relative timestamps
2560 * is not so great if request processing is slow, while absolute
2561 * timestamps are not ideal because they need time synchronization. */
2562 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2564 class_import_put(imp);
2569 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2571 ptlrpc_request_free(req);
2574 ptlrpc_request_set_replen(req);
2575 req->rq_request_portal = OST_CREATE_PORTAL;
2576 ptlrpc_at_set_req_timeout(req);
2578 if (flags & OBD_STATFS_NODELAY) {
2579 /* procfs requests not want stat in wait for avoid deadlock */
2580 req->rq_no_resend = 1;
2581 req->rq_no_delay = 1;
2584 rc = ptlrpc_queue_wait(req);
2588 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2590 GOTO(out, rc = -EPROTO);
2597 ptlrpc_req_finished(req);
2601 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2602 void *karg, void *uarg)
2604 struct obd_device *obd = exp->exp_obd;
2605 struct obd_ioctl_data *data = karg;
2609 if (!try_module_get(THIS_MODULE)) {
2610 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2611 module_name(THIS_MODULE));
2615 case OBD_IOC_CLIENT_RECOVER:
2616 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2617 data->ioc_inlbuf1, 0);
2621 case IOC_OSC_SET_ACTIVE:
2622 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2625 case OBD_IOC_POLL_QUOTACHECK:
2626 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2628 case OBD_IOC_PING_TARGET:
2629 err = ptlrpc_obd_ping(obd);
2632 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2633 cmd, current_comm());
2634 GOTO(out, err = -ENOTTY);
2637 module_put(THIS_MODULE);
2641 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2642 obd_count keylen, void *key, __u32 *vallen, void *val,
2643 struct lov_stripe_md *lsm)
2646 if (!vallen || !val)
2649 if (KEY_IS(KEY_FIEMAP)) {
2650 struct ll_fiemap_info_key *fm_key =
2651 (struct ll_fiemap_info_key *)key;
2652 struct ldlm_res_id res_id;
2653 ldlm_policy_data_t policy;
2654 struct lustre_handle lockh;
2655 ldlm_mode_t mode = 0;
2656 struct ptlrpc_request *req;
2657 struct ll_user_fiemap *reply;
2661 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2664 policy.l_extent.start = fm_key->fiemap.fm_start &
2667 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2668 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2669 policy.l_extent.end = OBD_OBJECT_EOF;
2671 policy.l_extent.end = (fm_key->fiemap.fm_start +
2672 fm_key->fiemap.fm_length +
2673 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2675 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2676 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2677 LDLM_FL_BLOCK_GRANTED |
2679 &res_id, LDLM_EXTENT, &policy,
2680 LCK_PR | LCK_PW, &lockh, 0);
2681 if (mode) { /* lock is cached on client */
2682 if (mode != LCK_PR) {
2683 ldlm_lock_addref(&lockh, LCK_PR);
2684 ldlm_lock_decref(&lockh, LCK_PW);
2686 } else { /* no cached lock, needs acquire lock on server side */
2687 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2688 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2692 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2693 &RQF_OST_GET_INFO_FIEMAP);
2695 GOTO(drop_lock, rc = -ENOMEM);
2697 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2698 RCL_CLIENT, keylen);
2699 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2700 RCL_CLIENT, *vallen);
2701 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2702 RCL_SERVER, *vallen);
2704 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2706 ptlrpc_request_free(req);
2707 GOTO(drop_lock, rc);
2710 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2711 memcpy(tmp, key, keylen);
2712 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2713 memcpy(tmp, val, *vallen);
2715 ptlrpc_request_set_replen(req);
2716 rc = ptlrpc_queue_wait(req);
2720 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2722 GOTO(fini_req, rc = -EPROTO);
2724 memcpy(val, reply, *vallen);
2726 ptlrpc_req_finished(req);
2729 ldlm_lock_decref(&lockh, LCK_PR);
2736 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2737 obd_count keylen, void *key, obd_count vallen,
2738 void *val, struct ptlrpc_request_set *set)
2740 struct ptlrpc_request *req;
2741 struct obd_device *obd = exp->exp_obd;
2742 struct obd_import *imp = class_exp2cliimp(exp);
2747 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2749 if (KEY_IS(KEY_CHECKSUM)) {
2750 if (vallen != sizeof(int))
2752 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2756 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2757 sptlrpc_conf_client_adapt(obd);
2761 if (KEY_IS(KEY_FLUSH_CTX)) {
2762 sptlrpc_import_flush_my_ctx(imp);
2766 if (KEY_IS(KEY_CACHE_SET)) {
2767 struct client_obd *cli = &obd->u.cli;
2769 LASSERT(cli->cl_cache == NULL); /* only once */
2770 cli->cl_cache = (struct cl_client_cache *)val;
2771 atomic_inc(&cli->cl_cache->ccc_users);
2772 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2774 /* add this osc into entity list */
2775 LASSERT(list_empty(&cli->cl_lru_osc));
2776 spin_lock(&cli->cl_cache->ccc_lru_lock);
2777 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2778 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2783 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2784 struct client_obd *cli = &obd->u.cli;
2785 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2786 long target = *(long *)val;
2788 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2793 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2796 /* We pass all other commands directly to OST. Since nobody calls osc
2797 methods directly and everybody is supposed to go through LOV, we
2798 assume lov checked invalid values for us.
2799 The only recognised values so far are evict_by_nid and mds_conn.
2800 Even if something bad goes through, we'd get a -EINVAL from OST
2803 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2804 &RQF_OST_SET_GRANT_INFO :
2809 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2810 RCL_CLIENT, keylen);
2811 if (!KEY_IS(KEY_GRANT_SHRINK))
2812 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2813 RCL_CLIENT, vallen);
2814 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2816 ptlrpc_request_free(req);
2820 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2821 memcpy(tmp, key, keylen);
2822 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2825 memcpy(tmp, val, vallen);
2827 if (KEY_IS(KEY_GRANT_SHRINK)) {
2828 struct osc_grant_args *aa;
2831 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2832 aa = ptlrpc_req_async_args(req);
2835 ptlrpc_req_finished(req);
2838 *oa = ((struct ost_body *)val)->oa;
2840 req->rq_interpret_reply = osc_shrink_grant_interpret;
2843 ptlrpc_request_set_replen(req);
2844 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2845 LASSERT(set != NULL);
2846 ptlrpc_set_add_req(set, req);
2847 ptlrpc_check_set(NULL, set);
2849 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2854 static int osc_reconnect(const struct lu_env *env,
2855 struct obd_export *exp, struct obd_device *obd,
2856 struct obd_uuid *cluuid,
2857 struct obd_connect_data *data,
2860 struct client_obd *cli = &obd->u.cli;
2862 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2865 client_obd_list_lock(&cli->cl_loi_list_lock);
2866 data->ocd_grant = (cli->cl_avail_grant +
2867 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2868 2 * cli_brw_size(obd);
2869 lost_grant = cli->cl_lost_grant;
2870 cli->cl_lost_grant = 0;
2871 client_obd_list_unlock(&cli->cl_loi_list_lock);
2873 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2874 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2875 data->ocd_version, data->ocd_grant, lost_grant);
2881 static int osc_disconnect(struct obd_export *exp)
2883 struct obd_device *obd = class_exp2obd(exp);
2886 rc = client_disconnect_export(exp);
2888 * Initially we put del_shrink_grant before disconnect_export, but it
2889 * causes the following problem if setup (connect) and cleanup
2890 * (disconnect) are tangled together.
2891 * connect p1 disconnect p2
2892 * ptlrpc_connect_import
2893 * ............... class_manual_cleanup
2896 * ptlrpc_connect_interrupt
2898 * add this client to shrink list
2900 * Bang! pinger trigger the shrink.
2901 * So the osc should be disconnected from the shrink list, after we
2902 * are sure the import has been destroyed. BUG18662
2904 if (obd->u.cli.cl_import == NULL)
2905 osc_del_shrink_grant(&obd->u.cli);
2909 static int osc_import_event(struct obd_device *obd,
2910 struct obd_import *imp,
2911 enum obd_import_event event)
2913 struct client_obd *cli;
2917 LASSERT(imp->imp_obd == obd);
2920 case IMP_EVENT_DISCON: {
2922 client_obd_list_lock(&cli->cl_loi_list_lock);
2923 cli->cl_avail_grant = 0;
2924 cli->cl_lost_grant = 0;
2925 client_obd_list_unlock(&cli->cl_loi_list_lock);
2928 case IMP_EVENT_INACTIVE: {
2929 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2932 case IMP_EVENT_INVALIDATE: {
2933 struct ldlm_namespace *ns = obd->obd_namespace;
2937 env = cl_env_get(&refcheck);
2941 /* all pages go to failing rpcs due to the invalid
2943 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2945 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2946 cl_env_put(env, &refcheck);
2951 case IMP_EVENT_ACTIVE: {
2952 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2955 case IMP_EVENT_OCD: {
2956 struct obd_connect_data *ocd = &imp->imp_connect_data;
2958 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2959 osc_init_grant(&obd->u.cli, ocd);
2962 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2963 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2965 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2968 case IMP_EVENT_DEACTIVATE: {
2969 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2972 case IMP_EVENT_ACTIVATE: {
2973 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2977 CERROR("Unknown import event %d\n", event);
2984 * Determine whether the lock can be canceled before replaying the lock
2985 * during recovery, see bug16774 for detailed information.
2987 * \retval zero the lock can't be canceled
2988 * \retval other ok to cancel
2990 static int osc_cancel_weight(struct ldlm_lock *lock)
2993 * Cancel all unused and granted extent lock.
2995 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2996 lock->l_granted_mode == lock->l_req_mode &&
2997 osc_ldlm_weigh_ast(lock) == 0)
3003 static int brw_queue_work(const struct lu_env *env, void *data)
3005 struct client_obd *cli = data;
3007 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3009 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3013 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3015 struct client_obd *cli = &obd->u.cli;
3016 struct obd_type *type;
3021 rc = ptlrpcd_addref();
3025 rc = client_obd_setup(obd, lcfg);
3027 GOTO(out_ptlrpcd, rc);
3029 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3030 if (IS_ERR(handler))
3031 GOTO(out_client_setup, rc = PTR_ERR(handler));
3032 cli->cl_writeback_work = handler;
3034 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3035 if (IS_ERR(handler))
3036 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3037 cli->cl_lru_work = handler;
3039 rc = osc_quota_setup(obd);
3041 GOTO(out_ptlrpcd_work, rc);
3043 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3046 obd->obd_vars = lprocfs_osc_obd_vars;
3048 /* If this is true then both client (osc) and server (osp) are on the
3049 * same node. The osp layer if loaded first will register the osc proc
3050 * directory. In that case this obd_device will be attached its proc
3051 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3052 type = class_search_type(LUSTRE_OSP_NAME);
3053 if (type && type->typ_procsym) {
3054 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3056 obd->obd_vars, obd);
3057 if (IS_ERR(obd->obd_proc_entry)) {
3058 rc = PTR_ERR(obd->obd_proc_entry);
3059 CERROR("error %d setting up lprocfs for %s\n", rc,
3061 obd->obd_proc_entry = NULL;
3064 rc = lprocfs_obd_setup(obd);
3067 /* If the basic OSC proc tree construction succeeded then
3068 * lets do the rest. */
3070 lproc_osc_attach_seqstat(obd);
3071 sptlrpc_lprocfs_cliobd_attach(obd);
3072 ptlrpc_lprocfs_register_obd(obd);
3075 /* We need to allocate a few requests more, because
3076 * brw_interpret tries to create new requests before freeing
3077 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3078 * reserved, but I'm afraid that might be too much wasted RAM
3079 * in fact, so 2 is just my guess and still should work. */
3080 cli->cl_import->imp_rq_pool =
3081 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3083 ptlrpc_add_rqs_to_pool);
3085 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3086 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3090 if (cli->cl_writeback_work != NULL) {
3091 ptlrpcd_destroy_work(cli->cl_writeback_work);
3092 cli->cl_writeback_work = NULL;
3094 if (cli->cl_lru_work != NULL) {
3095 ptlrpcd_destroy_work(cli->cl_lru_work);
3096 cli->cl_lru_work = NULL;
3099 client_obd_cleanup(obd);
3105 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3111 case OBD_CLEANUP_EARLY: {
3112 struct obd_import *imp;
3113 imp = obd->u.cli.cl_import;
3114 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3115 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3116 ptlrpc_deactivate_import(imp);
3117 spin_lock(&imp->imp_lock);
3118 imp->imp_pingable = 0;
3119 spin_unlock(&imp->imp_lock);
3122 case OBD_CLEANUP_EXPORTS: {
3123 struct client_obd *cli = &obd->u.cli;
3125 * for echo client, export may be on zombie list, wait for
3126 * zombie thread to cull it, because cli.cl_import will be
3127 * cleared in client_disconnect_export():
3128 * class_export_destroy() -> obd_cleanup() ->
3129 * echo_device_free() -> echo_client_cleanup() ->
3130 * obd_disconnect() -> osc_disconnect() ->
3131 * client_disconnect_export()
3133 obd_zombie_barrier();
3134 if (cli->cl_writeback_work) {
3135 ptlrpcd_destroy_work(cli->cl_writeback_work);
3136 cli->cl_writeback_work = NULL;
3138 if (cli->cl_lru_work) {
3139 ptlrpcd_destroy_work(cli->cl_lru_work);
3140 cli->cl_lru_work = NULL;
3142 obd_cleanup_client_import(obd);
3143 ptlrpc_lprocfs_unregister_obd(obd);
3144 lprocfs_obd_cleanup(obd);
3151 int osc_cleanup(struct obd_device *obd)
3153 struct client_obd *cli = &obd->u.cli;
3159 if (cli->cl_cache != NULL) {
3160 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3161 spin_lock(&cli->cl_cache->ccc_lru_lock);
3162 list_del_init(&cli->cl_lru_osc);
3163 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3164 cli->cl_lru_left = NULL;
3165 atomic_dec(&cli->cl_cache->ccc_users);
3166 cli->cl_cache = NULL;
3169 /* free memory of osc quota cache */
3170 osc_quota_cleanup(obd);
3172 rc = client_obd_cleanup(obd);
3178 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3180 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3181 return rc > 0 ? 0: rc;
3184 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3186 return osc_process_config_base(obd, buf);
3189 struct obd_ops osc_obd_ops = {
3190 .o_owner = THIS_MODULE,
3191 .o_setup = osc_setup,
3192 .o_precleanup = osc_precleanup,
3193 .o_cleanup = osc_cleanup,
3194 .o_add_conn = client_import_add_conn,
3195 .o_del_conn = client_import_del_conn,
3196 .o_connect = client_connect_import,
3197 .o_reconnect = osc_reconnect,
3198 .o_disconnect = osc_disconnect,
3199 .o_statfs = osc_statfs,
3200 .o_statfs_async = osc_statfs_async,
3201 .o_unpackmd = osc_unpackmd,
3202 .o_create = osc_create,
3203 .o_destroy = osc_destroy,
3204 .o_getattr = osc_getattr,
3205 .o_getattr_async = osc_getattr_async,
3206 .o_setattr = osc_setattr,
3207 .o_setattr_async = osc_setattr_async,
3208 .o_change_cbdata = osc_change_cbdata,
3209 .o_find_cbdata = osc_find_cbdata,
3210 .o_iocontrol = osc_iocontrol,
3211 .o_get_info = osc_get_info,
3212 .o_set_info_async = osc_set_info_async,
3213 .o_import_event = osc_import_event,
3214 .o_process_config = osc_process_config,
3215 .o_quotactl = osc_quotactl,
3216 .o_quotacheck = osc_quotacheck,
3219 extern struct lu_kmem_descr osc_caches[];
3220 extern struct lock_class_key osc_ast_guard_class;
3222 int __init osc_init(void)
3224 bool enable_proc = true;
3225 struct obd_type *type;
3229 /* print an address of _any_ initialized kernel symbol from this
3230 * module, to allow debugging with gdb that doesn't support data
3231 * symbols from modules.*/
3232 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3234 rc = lu_kmem_init(osc_caches);
3238 type = class_search_type(LUSTRE_OSP_NAME);
3239 if (type != NULL && type->typ_procsym != NULL)
3240 enable_proc = false;
3242 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3243 LUSTRE_OSC_NAME, &osc_device_type);
3245 lu_kmem_fini(osc_caches);
3252 static void /*__exit*/ osc_exit(void)
3254 class_unregister_type(LUSTRE_OSC_NAME);
3255 lu_kmem_fini(osc_caches);
3258 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3259 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3260 MODULE_LICENSE("GPL");
3262 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);