4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 # include <liblustre.h>
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <lustre_ha.h>
50 #include <lprocfs_status.h>
51 #include <lustre_ioctl.h>
52 #include <lustre_log.h>
53 #include <lustre_debug.h>
54 #include <lustre_param.h>
55 #include <lustre_fid.h>
56 #include "osc_internal.h"
57 #include "osc_cl_internal.h"
59 struct osc_brw_async_args {
63 obd_count aa_page_count;
65 struct brw_page **aa_ppga;
66 struct client_obd *aa_cli;
67 struct list_head aa_oaps;
68 struct list_head aa_exts;
69 struct obd_capa *aa_ocapa;
70 struct cl_req *aa_clerq;
73 #define osc_grant_args osc_brw_async_args
75 struct osc_async_args {
76 struct obd_info *aa_oi;
79 struct osc_setattr_args {
81 obd_enqueue_update_f sa_upcall;
85 struct osc_fsync_args {
86 struct obd_info *fa_oi;
87 obd_enqueue_update_f fa_upcall;
91 struct osc_enqueue_args {
92 struct obd_export *oa_exp;
94 obd_enqueue_update_f oa_upcall;
96 struct ost_lvb *oa_lvb;
97 struct lustre_handle *oa_lockh;
98 struct ldlm_enqueue_info *oa_ei;
99 unsigned int oa_agl:1;
102 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
103 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
106 /* Unpack OSC object metadata from disk storage (LE byte order). */
107 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
108 struct lov_mds_md *lmm, int lmm_bytes)
111 struct obd_import *imp = class_exp2cliimp(exp);
115 if (lmm_bytes < sizeof(*lmm)) {
116 CERROR("%s: lov_mds_md too small: %d, need %d\n",
117 exp->exp_obd->obd_name, lmm_bytes,
121 /* XXX LOV_MAGIC etc check? */
123 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
124 CERROR("%s: zero lmm_object_id: rc = %d\n",
125 exp->exp_obd->obd_name, -EINVAL);
130 lsm_size = lov_stripe_md_size(1);
134 if (*lsmp != NULL && lmm == NULL) {
135 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 OBD_FREE(*lsmp, lsm_size);
142 OBD_ALLOC(*lsmp, lsm_size);
143 if (unlikely(*lsmp == NULL))
145 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
146 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
147 OBD_FREE(*lsmp, lsm_size);
150 loi_init((*lsmp)->lsm_oinfo[0]);
151 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
156 /* XXX zero *lsmp? */
157 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
160 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
161 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
163 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
168 static inline void osc_pack_capa(struct ptlrpc_request *req,
169 struct ost_body *body, void *capa)
171 struct obd_capa *oc = (struct obd_capa *)capa;
172 struct lustre_capa *c;
177 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
180 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
181 DEBUG_CAPA(D_SEC, c, "pack");
184 static inline void osc_pack_req_body(struct ptlrpc_request *req,
185 struct obd_info *oinfo)
187 struct ost_body *body;
189 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
192 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
194 osc_pack_capa(req, body, oinfo->oi_capa);
197 static inline void osc_set_capa_size(struct ptlrpc_request *req,
198 const struct req_msg_field *field,
202 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
204 /* it is already calculated as sizeof struct obd_capa */
208 static int osc_getattr_interpret(const struct lu_env *env,
209 struct ptlrpc_request *req,
210 struct osc_async_args *aa, int rc)
212 struct ost_body *body;
218 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
220 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
221 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
222 aa->aa_oi->oi_oa, &body->oa);
224 /* This should really be sent by the OST */
225 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
226 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
228 CDEBUG(D_INFO, "can't unpack ost_body\n");
230 aa->aa_oi->oi_oa->o_valid = 0;
233 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
237 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
238 struct ptlrpc_request_set *set)
240 struct ptlrpc_request *req;
241 struct osc_async_args *aa;
245 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
249 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
250 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
252 ptlrpc_request_free(req);
256 osc_pack_req_body(req, oinfo);
258 ptlrpc_request_set_replen(req);
259 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
261 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
262 aa = ptlrpc_req_async_args(req);
265 ptlrpc_set_add_req(set, req);
269 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
270 struct obd_info *oinfo)
272 struct ptlrpc_request *req;
273 struct ost_body *body;
277 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
281 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
282 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284 ptlrpc_request_free(req);
288 osc_pack_req_body(req, oinfo);
290 ptlrpc_request_set_replen(req);
292 rc = ptlrpc_queue_wait(req);
296 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
298 GOTO(out, rc = -EPROTO);
300 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
301 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
304 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
305 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
309 ptlrpc_req_finished(req);
313 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
314 struct obd_info *oinfo, struct obd_trans_info *oti)
316 struct ptlrpc_request *req;
317 struct ost_body *body;
321 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
323 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
327 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
328 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
330 ptlrpc_request_free(req);
334 osc_pack_req_body(req, oinfo);
336 ptlrpc_request_set_replen(req);
338 rc = ptlrpc_queue_wait(req);
342 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344 GOTO(out, rc = -EPROTO);
346 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
351 ptlrpc_req_finished(req);
355 static int osc_setattr_interpret(const struct lu_env *env,
356 struct ptlrpc_request *req,
357 struct osc_setattr_args *sa, int rc)
359 struct ost_body *body;
365 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
367 GOTO(out, rc = -EPROTO);
369 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
372 rc = sa->sa_upcall(sa->sa_cookie, rc);
376 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
377 struct obd_trans_info *oti,
378 obd_enqueue_update_f upcall, void *cookie,
379 struct ptlrpc_request_set *rqset)
381 struct ptlrpc_request *req;
382 struct osc_setattr_args *sa;
386 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
390 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
391 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
393 ptlrpc_request_free(req);
397 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
398 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
400 osc_pack_req_body(req, oinfo);
402 ptlrpc_request_set_replen(req);
404 /* do mds to ost setattr asynchronously */
406 /* Do not wait for response. */
407 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
409 req->rq_interpret_reply =
410 (ptlrpc_interpterer_t)osc_setattr_interpret;
412 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
413 sa = ptlrpc_req_async_args(req);
414 sa->sa_oa = oinfo->oi_oa;
415 sa->sa_upcall = upcall;
416 sa->sa_cookie = cookie;
418 if (rqset == PTLRPCD_SET)
419 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
421 ptlrpc_set_add_req(rqset, req);
427 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
428 struct obd_trans_info *oti,
429 struct ptlrpc_request_set *rqset)
431 return osc_setattr_async_base(exp, oinfo, oti,
432 oinfo->oi_cb_up, oinfo, rqset);
435 int osc_real_create(struct obd_export *exp, struct obdo *oa,
436 struct lov_stripe_md **ea, struct obd_trans_info *oti)
438 struct ptlrpc_request *req;
439 struct ost_body *body;
440 struct lov_stripe_md *lsm;
449 rc = obd_alloc_memmd(exp, &lsm);
454 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
456 GOTO(out, rc = -ENOMEM);
458 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
460 ptlrpc_request_free(req);
464 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
467 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
469 ptlrpc_request_set_replen(req);
471 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
472 oa->o_flags == OBD_FL_DELORPHAN) {
474 "delorphan from OST integration");
475 /* Don't resend the delorphan req */
476 req->rq_no_resend = req->rq_no_delay = 1;
479 rc = ptlrpc_queue_wait(req);
483 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
485 GOTO(out_req, rc = -EPROTO);
487 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
488 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
490 oa->o_blksize = cli_brw_size(exp->exp_obd);
491 oa->o_valid |= OBD_MD_FLBLKSZ;
493 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
494 * have valid lsm_oinfo data structs, so don't go touching that.
495 * This needs to be fixed in a big way.
497 lsm->lsm_oi = oa->o_oi;
501 if (oa->o_valid & OBD_MD_FLCOOKIE) {
502 if (oti->oti_logcookies == NULL)
503 oti->oti_logcookies = &oti->oti_onecookie;
505 *oti->oti_logcookies = oa->o_lcookie;
509 CDEBUG(D_HA, "transno: "LPD64"\n",
510 lustre_msg_get_transno(req->rq_repmsg));
512 ptlrpc_req_finished(req);
515 obd_free_memmd(exp, &lsm);
519 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
520 obd_enqueue_update_f upcall, void *cookie,
521 struct ptlrpc_request_set *rqset)
523 struct ptlrpc_request *req;
524 struct osc_setattr_args *sa;
525 struct ost_body *body;
529 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536 ptlrpc_request_free(req);
539 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540 ptlrpc_at_set_req_timeout(req);
542 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
546 osc_pack_capa(req, body, oinfo->oi_capa);
548 ptlrpc_request_set_replen(req);
550 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
552 sa = ptlrpc_req_async_args(req);
553 sa->sa_oa = oinfo->oi_oa;
554 sa->sa_upcall = upcall;
555 sa->sa_cookie = cookie;
556 if (rqset == PTLRPCD_SET)
557 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
559 ptlrpc_set_add_req(rqset, req);
564 static int osc_sync_interpret(const struct lu_env *env,
565 struct ptlrpc_request *req,
568 struct osc_fsync_args *fa = arg;
569 struct ost_body *body;
575 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
577 CERROR ("can't unpack ost_body\n");
578 GOTO(out, rc = -EPROTO);
581 *fa->fa_oi->oi_oa = body->oa;
583 rc = fa->fa_upcall(fa->fa_cookie, rc);
587 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
588 obd_enqueue_update_f upcall, void *cookie,
589 struct ptlrpc_request_set *rqset)
591 struct ptlrpc_request *req;
592 struct ost_body *body;
593 struct osc_fsync_args *fa;
597 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
602 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
604 ptlrpc_request_free(req);
608 /* overload the size and blocks fields in the oa with start/end */
609 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
611 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
613 osc_pack_capa(req, body, oinfo->oi_capa);
615 ptlrpc_request_set_replen(req);
616 req->rq_interpret_reply = osc_sync_interpret;
618 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
619 fa = ptlrpc_req_async_args(req);
621 fa->fa_upcall = upcall;
622 fa->fa_cookie = cookie;
624 if (rqset == PTLRPCD_SET)
625 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
627 ptlrpc_set_add_req(rqset, req);
632 /* Find and cancel locally locks matched by @mode in the resource found by
633 * @objid. Found locks are added into @cancel list. Returns the amount of
634 * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636 struct list_head *cancels,
637 ldlm_mode_t mode, __u64 lock_flags)
639 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640 struct ldlm_res_id res_id;
641 struct ldlm_resource *res;
645 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
646 * export) but disabled through procfs (flag in NS).
648 * This distinguishes from a case when ELC is not supported originally,
649 * when we still want to cancel locks in advance and just cancel them
650 * locally, without sending any RPC. */
651 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
654 ostid_build_res_name(&oa->o_oi, &res_id);
655 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
659 LDLM_RESOURCE_ADDREF(res);
660 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
661 lock_flags, 0, NULL);
662 LDLM_RESOURCE_DELREF(res);
663 ldlm_resource_putref(res);
667 static int osc_destroy_interpret(const struct lu_env *env,
668 struct ptlrpc_request *req, void *data,
671 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
673 atomic_dec(&cli->cl_destroy_in_flight);
674 wake_up(&cli->cl_destroy_waitq);
678 static int osc_can_send_destroy(struct client_obd *cli)
680 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
681 cli->cl_max_rpcs_in_flight) {
682 /* The destroy request can be sent */
685 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
686 cli->cl_max_rpcs_in_flight) {
688 * The counter has been modified between the two atomic
691 wake_up(&cli->cl_destroy_waitq);
696 int osc_create(const struct lu_env *env, struct obd_export *exp,
697 struct obdo *oa, struct lov_stripe_md **ea,
698 struct obd_trans_info *oti)
705 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
707 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
708 oa->o_flags == OBD_FL_RECREATE_OBJS) {
709 RETURN(osc_real_create(exp, oa, ea, oti));
712 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
713 RETURN(osc_real_create(exp, oa, ea, oti));
715 /* we should not get here anymore */
721 /* Destroy requests can be async always on the client, and we don't even really
722 * care about the return code since the client cannot do anything at all about
724 * When the MDS is unlinking a filename, it saves the file objects into a
725 * recovery llog, and these object records are cancelled when the OST reports
726 * they were destroyed and sync'd to disk (i.e. transaction committed).
727 * If the client dies, or the OST is down when the object should be destroyed,
728 * the records are not cancelled, and when the OST reconnects to the MDS next,
729 * it will retrieve the llog unlink logs and then sends the log cancellation
730 * cookies to the MDS after committing destroy transactions. */
731 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
732 struct obdo *oa, struct lov_stripe_md *ea,
733 struct obd_trans_info *oti, struct obd_export *md_export,
736 struct client_obd *cli = &exp->exp_obd->u.cli;
737 struct ptlrpc_request *req;
738 struct ost_body *body;
739 struct list_head cancels = LIST_HEAD_INIT(cancels);
744 CDEBUG(D_INFO, "oa NULL\n");
748 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
749 LDLM_FL_DISCARD_DATA);
751 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
753 ldlm_lock_list_put(&cancels, l_bl_ast, count);
757 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
758 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
761 ptlrpc_request_free(req);
765 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
766 ptlrpc_at_set_req_timeout(req);
768 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
769 oa->o_lcookie = *oti->oti_logcookies;
770 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
772 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
774 osc_pack_capa(req, body, (struct obd_capa *)capa);
775 ptlrpc_request_set_replen(req);
777 /* If osc_destory is for destroying the unlink orphan,
778 * sent from MDT to OST, which should not be blocked here,
779 * because the process might be triggered by ptlrpcd, and
780 * it is not good to block ptlrpcd thread (b=16006)*/
781 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
782 req->rq_interpret_reply = osc_destroy_interpret;
783 if (!osc_can_send_destroy(cli)) {
784 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
788 * Wait until the number of on-going destroy RPCs drops
789 * under max_rpc_in_flight
791 l_wait_event_exclusive(cli->cl_destroy_waitq,
792 osc_can_send_destroy(cli), &lwi);
796 /* Do not wait for response */
797 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
801 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
804 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
806 LASSERT(!(oa->o_valid & bits));
809 client_obd_list_lock(&cli->cl_loi_list_lock);
810 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
811 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
812 cli->cl_dirty_max_pages)) {
813 CERROR("dirty %lu - %lu > dirty_max %lu\n",
814 cli->cl_dirty_pages, cli->cl_dirty_transit,
815 cli->cl_dirty_max_pages);
817 } else if (unlikely(atomic_read(&obd_dirty_pages) -
818 atomic_read(&obd_dirty_transit_pages) >
819 (long)(obd_max_dirty_pages + 1))) {
820 /* The atomic_read() allowing the atomic_inc() are
821 * not covered by a lock thus they may safely race and trip
822 * this CERROR() unless we add in a small fudge factor (+1). */
823 CERROR("%s: dirty %d - %d > system dirty_max %d\n",
824 cli->cl_import->imp_obd->obd_name,
825 atomic_read(&obd_dirty_pages),
826 atomic_read(&obd_dirty_transit_pages),
827 obd_max_dirty_pages);
829 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
831 CERROR("dirty %lu - dirty_max %lu too big???\n",
832 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
835 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
837 (cli->cl_max_rpcs_in_flight + 1);
838 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
841 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
842 oa->o_dropped = cli->cl_lost_grant;
843 cli->cl_lost_grant = 0;
844 client_obd_list_unlock(&cli->cl_loi_list_lock);
845 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
846 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
850 void osc_update_next_shrink(struct client_obd *cli)
852 cli->cl_next_shrink_grant =
853 cfs_time_shift(cli->cl_grant_shrink_interval);
854 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
855 cli->cl_next_shrink_grant);
858 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
860 client_obd_list_lock(&cli->cl_loi_list_lock);
861 cli->cl_avail_grant += grant;
862 client_obd_list_unlock(&cli->cl_loi_list_lock);
865 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
867 if (body->oa.o_valid & OBD_MD_FLGRANT) {
868 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
869 __osc_update_grant(cli, body->oa.o_grant);
873 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
874 obd_count keylen, void *key, obd_count vallen,
875 void *val, struct ptlrpc_request_set *set);
877 static int osc_shrink_grant_interpret(const struct lu_env *env,
878 struct ptlrpc_request *req,
881 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
882 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
883 struct ost_body *body;
886 __osc_update_grant(cli, oa->o_grant);
890 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
892 osc_update_grant(cli, body);
898 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
900 client_obd_list_lock(&cli->cl_loi_list_lock);
901 oa->o_grant = cli->cl_avail_grant / 4;
902 cli->cl_avail_grant -= oa->o_grant;
903 client_obd_list_unlock(&cli->cl_loi_list_lock);
904 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
905 oa->o_valid |= OBD_MD_FLFLAGS;
908 oa->o_flags |= OBD_FL_SHRINK_GRANT;
909 osc_update_next_shrink(cli);
912 /* Shrink the current grant, either from some large amount to enough for a
913 * full set of in-flight RPCs, or if we have already shrunk to that limit
914 * then to enough for a single RPC. This avoids keeping more grant than
915 * needed, and avoids shrinking the grant piecemeal. */
916 static int osc_shrink_grant(struct client_obd *cli)
918 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
919 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
921 client_obd_list_lock(&cli->cl_loi_list_lock);
922 if (cli->cl_avail_grant <= target_bytes)
923 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
924 client_obd_list_unlock(&cli->cl_loi_list_lock);
926 return osc_shrink_grant_to_target(cli, target_bytes);
929 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
932 struct ost_body *body;
935 client_obd_list_lock(&cli->cl_loi_list_lock);
936 /* Don't shrink if we are already above or below the desired limit
937 * We don't want to shrink below a single RPC, as that will negatively
938 * impact block allocation and long-term performance. */
939 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
940 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 if (target_bytes >= cli->cl_avail_grant) {
943 client_obd_list_unlock(&cli->cl_loi_list_lock);
946 client_obd_list_unlock(&cli->cl_loi_list_lock);
952 osc_announce_cached(cli, &body->oa, 0);
954 client_obd_list_lock(&cli->cl_loi_list_lock);
955 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
956 cli->cl_avail_grant = target_bytes;
957 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
959 body->oa.o_valid |= OBD_MD_FLFLAGS;
960 body->oa.o_flags = 0;
962 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
963 osc_update_next_shrink(cli);
965 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
966 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
967 sizeof(*body), body, NULL);
969 __osc_update_grant(cli, body->oa.o_grant);
974 static int osc_should_shrink_grant(struct client_obd *client)
976 cfs_time_t time = cfs_time_current();
977 cfs_time_t next_shrink = client->cl_next_shrink_grant;
979 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
980 OBD_CONNECT_GRANT_SHRINK) == 0)
983 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
984 /* Get the current RPC size directly, instead of going via:
985 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
986 * Keep comment here so that it can be found by searching. */
987 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
989 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
990 client->cl_avail_grant > brw_size)
993 osc_update_next_shrink(client);
998 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1000 struct client_obd *client;
1002 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1003 if (osc_should_shrink_grant(client))
1004 osc_shrink_grant(client);
1009 static int osc_add_shrink_grant(struct client_obd *client)
1013 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1015 osc_grant_shrink_grant_cb, NULL,
1016 &client->cl_grant_shrink_list);
1018 CERROR("add grant client %s error %d\n",
1019 client->cl_import->imp_obd->obd_name, rc);
1022 CDEBUG(D_CACHE, "add grant client %s \n",
1023 client->cl_import->imp_obd->obd_name);
1024 osc_update_next_shrink(client);
1028 static int osc_del_shrink_grant(struct client_obd *client)
1030 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1034 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1037 * ocd_grant is the total grant amount we're expect to hold: if we've
1038 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1039 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1042 * race is tolerable here: if we're evicted, but imp_state already
1043 * left EVICTED state, then cl_dirty_pages must be 0 already.
1045 client_obd_list_lock(&cli->cl_loi_list_lock);
1046 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1047 cli->cl_avail_grant = ocd->ocd_grant;
1049 cli->cl_avail_grant = ocd->ocd_grant -
1050 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1052 if (cli->cl_avail_grant < 0) {
1053 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1054 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1055 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1056 /* workaround for servers which do not have the patch from
1058 cli->cl_avail_grant = ocd->ocd_grant;
1061 /* determine the appropriate chunk size used by osc_extent. */
1062 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1063 client_obd_list_unlock(&cli->cl_loi_list_lock);
1065 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1066 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1067 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1069 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1070 list_empty(&cli->cl_grant_shrink_list))
1071 osc_add_shrink_grant(cli);
1074 /* We assume that the reason this OSC got a short read is because it read
1075 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1076 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1077 * this stripe never got written at or beyond this stripe offset yet. */
1078 static void handle_short_read(int nob_read, obd_count page_count,
1079 struct brw_page **pga)
1084 /* skip bytes read OK */
1085 while (nob_read > 0) {
1086 LASSERT (page_count > 0);
1088 if (pga[i]->count > nob_read) {
1089 /* EOF inside this page */
1090 ptr = kmap(pga[i]->pg) +
1091 (pga[i]->off & ~CFS_PAGE_MASK);
1092 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1099 nob_read -= pga[i]->count;
1104 /* zero remaining pages */
1105 while (page_count-- > 0) {
1106 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1107 memset(ptr, 0, pga[i]->count);
1113 static int check_write_rcs(struct ptlrpc_request *req,
1114 int requested_nob, int niocount,
1115 obd_count page_count, struct brw_page **pga)
1120 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1121 sizeof(*remote_rcs) *
1123 if (remote_rcs == NULL) {
1124 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1128 /* return error if any niobuf was in error */
1129 for (i = 0; i < niocount; i++) {
1130 if ((int)remote_rcs[i] < 0)
1131 return(remote_rcs[i]);
1133 if (remote_rcs[i] != 0) {
1134 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1135 i, remote_rcs[i], req);
1140 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1141 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1142 req->rq_bulk->bd_nob_transferred, requested_nob);
1149 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1151 if (p1->flag != p2->flag) {
1152 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1153 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1154 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1156 /* warn if we try to combine flags that we don't know to be
1157 * safe to combine */
1158 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1159 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1160 "report this at https://jira.hpdd.intel.com/\n",
1161 p1->flag, p2->flag);
1166 return (p1->off + p1->count == p2->off);
1169 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1170 struct brw_page **pga, int opc,
1171 cksum_type_t cksum_type)
1175 struct cfs_crypto_hash_desc *hdesc;
1176 unsigned int bufsize;
1178 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1180 LASSERT(pg_count > 0);
1182 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1183 if (IS_ERR(hdesc)) {
1184 CERROR("Unable to initialize checksum hash %s\n",
1185 cfs_crypto_hash_name(cfs_alg));
1186 return PTR_ERR(hdesc);
1189 while (nob > 0 && pg_count > 0) {
1190 int count = pga[i]->count > nob ? nob : pga[i]->count;
1192 /* corrupt the data before we compute the checksum, to
1193 * simulate an OST->client data error */
1194 if (i == 0 && opc == OST_READ &&
1195 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1196 unsigned char *ptr = kmap(pga[i]->pg);
1197 int off = pga[i]->off & ~CFS_PAGE_MASK;
1199 memcpy(ptr + off, "bad1", min(4, nob));
1202 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1203 pga[i]->off & ~CFS_PAGE_MASK,
1205 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1206 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1208 nob -= pga[i]->count;
1213 bufsize = sizeof(cksum);
1214 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1216 /* For sending we only compute the wrong checksum instead
1217 * of corrupting the data so it is still correct on a redo */
1218 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1224 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1225 struct lov_stripe_md *lsm, obd_count page_count,
1226 struct brw_page **pga,
1227 struct ptlrpc_request **reqp,
1228 struct obd_capa *ocapa, int reserve,
1231 struct ptlrpc_request *req;
1232 struct ptlrpc_bulk_desc *desc;
1233 struct ost_body *body;
1234 struct obd_ioobj *ioobj;
1235 struct niobuf_remote *niobuf;
1236 int niocount, i, requested_nob, opc, rc;
1237 struct osc_brw_async_args *aa;
1238 struct req_capsule *pill;
1239 struct brw_page *pg_prev;
1242 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1243 RETURN(-ENOMEM); /* Recoverable */
1244 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1245 RETURN(-EINVAL); /* Fatal */
1247 if ((cmd & OBD_BRW_WRITE) != 0) {
1249 req = ptlrpc_request_alloc_pool(cli->cl_import,
1250 cli->cl_import->imp_rq_pool,
1251 &RQF_OST_BRW_WRITE);
1254 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1259 for (niocount = i = 1; i < page_count; i++) {
1260 if (!can_merge_pages(pga[i - 1], pga[i]))
1264 pill = &req->rq_pill;
1265 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1267 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1268 niocount * sizeof(*niobuf));
1269 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1271 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1273 ptlrpc_request_free(req);
1276 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1277 ptlrpc_at_set_req_timeout(req);
1278 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1280 req->rq_no_retry_einprogress = 1;
1282 desc = ptlrpc_prep_bulk_imp(req, page_count,
1283 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1284 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1288 GOTO(out, rc = -ENOMEM);
1289 /* NB request now owns desc and will free it when it gets freed */
1291 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1292 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1293 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1294 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1296 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1298 obdo_to_ioobj(oa, ioobj);
1299 ioobj->ioo_bufcnt = niocount;
1300 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1301 * that might be send for this request. The actual number is decided
1302 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1303 * "max - 1" for old client compatibility sending "0", and also so the
1304 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1305 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1306 osc_pack_capa(req, body, ocapa);
1307 LASSERT(page_count > 0);
1309 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1310 struct brw_page *pg = pga[i];
1311 int poff = pg->off & ~CFS_PAGE_MASK;
1313 LASSERT(pg->count > 0);
1314 /* make sure there is no gap in the middle of page array */
1315 LASSERTF(page_count == 1 ||
1316 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1317 ergo(i > 0 && i < page_count - 1,
1318 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1319 ergo(i == page_count - 1, poff == 0)),
1320 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1321 i, page_count, pg, pg->off, pg->count);
1323 LASSERTF(i == 0 || pg->off > pg_prev->off,
1324 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1325 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1327 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1328 pg_prev->pg, page_private(pg_prev->pg),
1329 pg_prev->pg->index, pg_prev->off);
1331 LASSERTF(i == 0 || pg->off > pg_prev->off,
1332 "i %d p_c %u\n", i, page_count);
1334 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1335 (pg->flag & OBD_BRW_SRVLOCK));
1337 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1338 requested_nob += pg->count;
1340 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1342 niobuf->rnb_len += pg->count;
1344 niobuf->rnb_offset = pg->off;
1345 niobuf->rnb_len = pg->count;
1346 niobuf->rnb_flags = pg->flag;
1351 LASSERTF((void *)(niobuf - niocount) ==
1352 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1353 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1354 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1356 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1358 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1359 body->oa.o_valid |= OBD_MD_FLFLAGS;
1360 body->oa.o_flags = 0;
1362 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1365 if (osc_should_shrink_grant(cli))
1366 osc_shrink_grant_local(cli, &body->oa);
1368 /* size[REQ_REC_OFF] still sizeof (*body) */
1369 if (opc == OST_WRITE) {
1370 if (cli->cl_checksum &&
1371 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1372 /* store cl_cksum_type in a local variable since
1373 * it can be changed via lprocfs */
1374 cksum_type_t cksum_type = cli->cl_cksum_type;
1376 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1377 oa->o_flags &= OBD_FL_LOCAL_MASK;
1378 body->oa.o_flags = 0;
1380 body->oa.o_flags |= cksum_type_pack(cksum_type);
1381 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1386 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1388 /* save this in 'oa', too, for later checking */
1389 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1390 oa->o_flags |= cksum_type_pack(cksum_type);
1392 /* clear out the checksum flag, in case this is a
1393 * resend but cl_checksum is no longer set. b=11238 */
1394 oa->o_valid &= ~OBD_MD_FLCKSUM;
1396 oa->o_cksum = body->oa.o_cksum;
1397 /* 1 RC per niobuf */
1398 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1399 sizeof(__u32) * niocount);
1401 if (cli->cl_checksum &&
1402 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1403 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1404 body->oa.o_flags = 0;
1405 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1406 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1409 ptlrpc_request_set_replen(req);
1411 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1412 aa = ptlrpc_req_async_args(req);
1414 aa->aa_requested_nob = requested_nob;
1415 aa->aa_nio_count = niocount;
1416 aa->aa_page_count = page_count;
1420 INIT_LIST_HEAD(&aa->aa_oaps);
1421 if (ocapa && reserve)
1422 aa->aa_ocapa = capa_get(ocapa);
1428 ptlrpc_req_finished(req);
1432 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1433 __u32 client_cksum, __u32 server_cksum, int nob,
1434 obd_count page_count, struct brw_page **pga,
1435 cksum_type_t client_cksum_type)
1439 cksum_type_t cksum_type;
1441 if (server_cksum == client_cksum) {
1442 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1446 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1448 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1451 if (cksum_type != client_cksum_type)
1452 msg = "the server did not use the checksum type specified in "
1453 "the original request - likely a protocol problem";
1454 else if (new_cksum == server_cksum)
1455 msg = "changed on the client after we checksummed it - "
1456 "likely false positive due to mmap IO (bug 11742)";
1457 else if (new_cksum == client_cksum)
1458 msg = "changed in transit before arrival at OST";
1460 msg = "changed in transit AND doesn't match the original - "
1461 "likely false positive due to mmap IO (bug 11742)";
1463 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1464 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1465 msg, libcfs_nid2str(peer->nid),
1466 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1467 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1468 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1469 POSTID(&oa->o_oi), pga[0]->off,
1470 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1471 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1472 "client csum now %x\n", client_cksum, client_cksum_type,
1473 server_cksum, cksum_type, new_cksum);
1477 /* Note rc enters this function as number of bytes transferred */
1478 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1480 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1481 const lnet_process_id_t *peer =
1482 &req->rq_import->imp_connection->c_peer;
1483 struct client_obd *cli = aa->aa_cli;
1484 struct ost_body *body;
1485 __u32 client_cksum = 0;
1488 if (rc < 0 && rc != -EDQUOT) {
1489 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1493 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1494 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1496 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1500 /* set/clear over quota flag for a uid/gid */
1501 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1502 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1503 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1505 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1506 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1508 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1511 osc_update_grant(cli, body);
1516 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1517 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1519 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1521 CERROR("Unexpected +ve rc %d\n", rc);
1524 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1526 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1529 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1530 check_write_checksum(&body->oa, peer, client_cksum,
1531 body->oa.o_cksum, aa->aa_requested_nob,
1532 aa->aa_page_count, aa->aa_ppga,
1533 cksum_type_unpack(aa->aa_oa->o_flags)))
1536 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1537 aa->aa_page_count, aa->aa_ppga);
1541 /* The rest of this function executes only for OST_READs */
1543 /* if unwrap_bulk failed, return -EAGAIN to retry */
1544 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1546 GOTO(out, rc = -EAGAIN);
1548 if (rc > aa->aa_requested_nob) {
1549 CERROR("Unexpected rc %d (%d requested)\n", rc,
1550 aa->aa_requested_nob);
1554 if (rc != req->rq_bulk->bd_nob_transferred) {
1555 CERROR ("Unexpected rc %d (%d transferred)\n",
1556 rc, req->rq_bulk->bd_nob_transferred);
1560 if (rc < aa->aa_requested_nob)
1561 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1563 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1564 static int cksum_counter;
1565 __u32 server_cksum = body->oa.o_cksum;
1568 cksum_type_t cksum_type;
1570 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1571 body->oa.o_flags : 0);
1572 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1573 aa->aa_ppga, OST_READ,
1576 if (peer->nid == req->rq_bulk->bd_sender) {
1580 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1583 if (server_cksum != client_cksum) {
1584 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1585 "%s%s%s inode "DFID" object "DOSTID
1586 " extent ["LPU64"-"LPU64"]\n",
1587 req->rq_import->imp_obd->obd_name,
1588 libcfs_nid2str(peer->nid),
1590 body->oa.o_valid & OBD_MD_FLFID ?
1591 body->oa.o_parent_seq : (__u64)0,
1592 body->oa.o_valid & OBD_MD_FLFID ?
1593 body->oa.o_parent_oid : 0,
1594 body->oa.o_valid & OBD_MD_FLFID ?
1595 body->oa.o_parent_ver : 0,
1596 POSTID(&body->oa.o_oi),
1597 aa->aa_ppga[0]->off,
1598 aa->aa_ppga[aa->aa_page_count-1]->off +
1599 aa->aa_ppga[aa->aa_page_count-1]->count -
1601 CERROR("client %x, server %x, cksum_type %x\n",
1602 client_cksum, server_cksum, cksum_type);
1604 aa->aa_oa->o_cksum = client_cksum;
1608 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1611 } else if (unlikely(client_cksum)) {
1612 static int cksum_missed;
1615 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1616 CERROR("Checksum %u requested from %s but not sent\n",
1617 cksum_missed, libcfs_nid2str(peer->nid));
1623 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1624 aa->aa_oa, &body->oa);
1629 static int osc_brw_redo_request(struct ptlrpc_request *request,
1630 struct osc_brw_async_args *aa, int rc)
1632 struct ptlrpc_request *new_req;
1633 struct osc_brw_async_args *new_aa;
1634 struct osc_async_page *oap;
1637 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1638 "redo for recoverable error %d", rc);
1640 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1641 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1642 aa->aa_cli, aa->aa_oa,
1643 NULL /* lsm unused by osc currently */,
1644 aa->aa_page_count, aa->aa_ppga,
1645 &new_req, aa->aa_ocapa, 0, 1);
1649 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1650 if (oap->oap_request != NULL) {
1651 LASSERTF(request == oap->oap_request,
1652 "request %p != oap_request %p\n",
1653 request, oap->oap_request);
1654 if (oap->oap_interrupted) {
1655 ptlrpc_req_finished(new_req);
1660 /* New request takes over pga and oaps from old request.
1661 * Note that copying a list_head doesn't work, need to move it... */
1663 new_req->rq_interpret_reply = request->rq_interpret_reply;
1664 new_req->rq_async_args = request->rq_async_args;
1665 new_req->rq_commit_cb = request->rq_commit_cb;
1666 /* cap resend delay to the current request timeout, this is similar to
1667 * what ptlrpc does (see after_reply()) */
1668 if (aa->aa_resends > new_req->rq_timeout)
1669 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1671 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1672 new_req->rq_generation_set = 1;
1673 new_req->rq_import_generation = request->rq_import_generation;
1675 new_aa = ptlrpc_req_async_args(new_req);
1677 INIT_LIST_HEAD(&new_aa->aa_oaps);
1678 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1679 INIT_LIST_HEAD(&new_aa->aa_exts);
1680 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1681 new_aa->aa_resends = aa->aa_resends;
1683 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1684 if (oap->oap_request) {
1685 ptlrpc_req_finished(oap->oap_request);
1686 oap->oap_request = ptlrpc_request_addref(new_req);
1690 new_aa->aa_ocapa = aa->aa_ocapa;
1691 aa->aa_ocapa = NULL;
1693 /* XXX: This code will run into problem if we're going to support
1694 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1695 * and wait for all of them to be finished. We should inherit request
1696 * set from old request. */
1697 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1699 DEBUG_REQ(D_INFO, new_req, "new request");
1704 * ugh, we want disk allocation on the target to happen in offset order. we'll
1705 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1706 * fine for our small page arrays and doesn't require allocation. its an
1707 * insertion sort that swaps elements that are strides apart, shrinking the
1708 * stride down until its '1' and the array is sorted.
1710 static void sort_brw_pages(struct brw_page **array, int num)
1713 struct brw_page *tmp;
1717 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1722 for (i = stride ; i < num ; i++) {
1725 while (j >= stride && array[j - stride]->off > tmp->off) {
1726 array[j] = array[j - stride];
1731 } while (stride > 1);
1734 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1736 LASSERT(ppga != NULL);
1737 OBD_FREE(ppga, sizeof(*ppga) * count);
1740 static int brw_interpret(const struct lu_env *env,
1741 struct ptlrpc_request *req, void *data, int rc)
1743 struct osc_brw_async_args *aa = data;
1744 struct osc_extent *ext;
1745 struct osc_extent *tmp;
1746 struct client_obd *cli = aa->aa_cli;
1749 rc = osc_brw_fini_request(req, rc);
1750 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1751 /* When server return -EINPROGRESS, client should always retry
1752 * regardless of the number of times the bulk was resent already. */
1753 if (osc_recoverable_error(rc)) {
1754 if (req->rq_import_generation !=
1755 req->rq_import->imp_generation) {
1756 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1757 ""DOSTID", rc = %d.\n",
1758 req->rq_import->imp_obd->obd_name,
1759 POSTID(&aa->aa_oa->o_oi), rc);
1760 } else if (rc == -EINPROGRESS ||
1761 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1762 rc = osc_brw_redo_request(req, aa, rc);
1764 CERROR("%s: too many resent retries for object: "
1765 ""LPU64":"LPU64", rc = %d.\n",
1766 req->rq_import->imp_obd->obd_name,
1767 POSTID(&aa->aa_oa->o_oi), rc);
1772 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1777 capa_put(aa->aa_ocapa);
1778 aa->aa_ocapa = NULL;
1782 struct obdo *oa = aa->aa_oa;
1783 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1784 unsigned long valid = 0;
1785 struct cl_object *obj;
1786 struct osc_async_page *last;
1788 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1789 obj = osc2cl(last->oap_obj);
1791 cl_object_attr_lock(obj);
1792 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1793 attr->cat_blocks = oa->o_blocks;
1794 valid |= CAT_BLOCKS;
1796 if (oa->o_valid & OBD_MD_FLMTIME) {
1797 attr->cat_mtime = oa->o_mtime;
1800 if (oa->o_valid & OBD_MD_FLATIME) {
1801 attr->cat_atime = oa->o_atime;
1804 if (oa->o_valid & OBD_MD_FLCTIME) {
1805 attr->cat_ctime = oa->o_ctime;
1809 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1810 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1811 loff_t last_off = last->oap_count + last->oap_obj_off +
1814 /* Change file size if this is an out of quota or
1815 * direct IO write and it extends the file size */
1816 if (loi->loi_lvb.lvb_size < last_off) {
1817 attr->cat_size = last_off;
1820 /* Extend KMS if it's not a lockless write */
1821 if (loi->loi_kms < last_off &&
1822 oap2osc_page(last)->ops_srvlock == 0) {
1823 attr->cat_kms = last_off;
1829 cl_object_attr_set(env, obj, attr, valid);
1830 cl_object_attr_unlock(obj);
1832 OBDO_FREE(aa->aa_oa);
1834 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1835 osc_inc_unstable_pages(req);
1837 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1838 list_del_init(&ext->oe_link);
1839 osc_extent_finish(env, ext, 1, rc);
1841 LASSERT(list_empty(&aa->aa_exts));
1842 LASSERT(list_empty(&aa->aa_oaps));
1844 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1845 req->rq_bulk->bd_nob_transferred);
1846 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1847 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1849 client_obd_list_lock(&cli->cl_loi_list_lock);
1850 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1851 * is called so we know whether to go to sync BRWs or wait for more
1852 * RPCs to complete */
1853 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1854 cli->cl_w_in_flight--;
1856 cli->cl_r_in_flight--;
1857 osc_wake_cache_waiters(cli);
1858 client_obd_list_unlock(&cli->cl_loi_list_lock);
1860 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1864 static void brw_commit(struct ptlrpc_request *req)
1866 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1867 * this called via the rq_commit_cb, I need to ensure
1868 * osc_dec_unstable_pages is still called. Otherwise unstable
1869 * pages may be leaked. */
1870 spin_lock(&req->rq_lock);
1871 if (likely(req->rq_unstable)) {
1872 req->rq_unstable = 0;
1873 spin_unlock(&req->rq_lock);
1875 osc_dec_unstable_pages(req);
1877 req->rq_committed = 1;
1878 spin_unlock(&req->rq_lock);
1883 * Build an RPC by the list of extent @ext_list. The caller must ensure
1884 * that the total pages in this list are NOT over max pages per RPC.
1885 * Extents in the list must be in OES_RPC state.
1887 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1888 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1890 struct ptlrpc_request *req = NULL;
1891 struct osc_extent *ext;
1892 struct brw_page **pga = NULL;
1893 struct osc_brw_async_args *aa = NULL;
1894 struct obdo *oa = NULL;
1895 struct osc_async_page *oap;
1896 struct osc_async_page *tmp;
1897 struct cl_req *clerq = NULL;
1898 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1900 struct cl_req_attr *crattr = NULL;
1901 obd_off starting_offset = OBD_OBJECT_EOF;
1902 obd_off ending_offset = 0;
1906 bool soft_sync = false;
1909 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1912 LASSERT(!list_empty(ext_list));
1914 /* add pages into rpc_list to build BRW rpc */
1915 list_for_each_entry(ext, ext_list, oe_link) {
1916 LASSERT(ext->oe_state == OES_RPC);
1917 mem_tight |= ext->oe_memalloc;
1918 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1920 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1921 if (starting_offset > oap->oap_obj_off)
1922 starting_offset = oap->oap_obj_off;
1924 LASSERT(oap->oap_page_off == 0);
1925 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1926 ending_offset = oap->oap_obj_off +
1929 LASSERT(oap->oap_page_off + oap->oap_count ==
1934 soft_sync = osc_over_unstable_soft_limit(cli);
1936 mpflag = cfs_memory_pressure_get_and_set();
1938 OBD_ALLOC(crattr, sizeof(*crattr));
1940 GOTO(out, rc = -ENOMEM);
1942 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1944 GOTO(out, rc = -ENOMEM);
1948 GOTO(out, rc = -ENOMEM);
1951 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1952 struct cl_page *page = oap2cl_page(oap);
1953 if (clerq == NULL) {
1954 clerq = cl_req_alloc(env, page, crt,
1955 1 /* only 1-object rpcs for now */);
1957 GOTO(out, rc = PTR_ERR(clerq));
1960 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1962 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1963 pga[i] = &oap->oap_brw_page;
1964 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1965 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1966 pga[i]->pg, page_index(oap->oap_page), oap,
1969 cl_req_page_add(env, clerq, page);
1972 /* always get the data for the obdo for the rpc */
1973 LASSERT(clerq != NULL);
1974 crattr->cra_oa = oa;
1975 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1977 rc = cl_req_prep(env, clerq);
1979 CERROR("cl_req_prep failed: %d\n", rc);
1983 sort_brw_pages(pga, page_count);
1984 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1985 pga, &req, crattr->cra_capa, 1, 0);
1987 CERROR("prep_req failed: %d\n", rc);
1991 req->rq_commit_cb = brw_commit;
1992 req->rq_interpret_reply = brw_interpret;
1995 req->rq_memalloc = 1;
1997 /* Need to update the timestamps after the request is built in case
1998 * we race with setattr (locally or in queue at OST). If OST gets
1999 * later setattr before earlier BRW (as determined by the request xid),
2000 * the OST will not use BRW timestamps. Sadly, there is no obvious
2001 * way to do this in a single call. bug 10150 */
2002 cl_req_attr_set(env, clerq, crattr,
2003 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2005 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2007 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2008 aa = ptlrpc_req_async_args(req);
2009 INIT_LIST_HEAD(&aa->aa_oaps);
2010 list_splice_init(&rpc_list, &aa->aa_oaps);
2011 INIT_LIST_HEAD(&aa->aa_exts);
2012 list_splice_init(ext_list, &aa->aa_exts);
2013 aa->aa_clerq = clerq;
2015 /* queued sync pages can be torn down while the pages
2016 * were between the pending list and the rpc */
2018 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2019 /* only one oap gets a request reference */
2022 if (oap->oap_interrupted && !req->rq_intr) {
2023 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2025 ptlrpc_mark_interrupted(req);
2029 tmp->oap_request = ptlrpc_request_addref(req);
2031 client_obd_list_lock(&cli->cl_loi_list_lock);
2032 starting_offset >>= PAGE_CACHE_SHIFT;
2033 if (cmd == OBD_BRW_READ) {
2034 cli->cl_r_in_flight++;
2035 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2036 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2037 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2038 starting_offset + 1);
2040 cli->cl_w_in_flight++;
2041 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2042 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2043 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2044 starting_offset + 1);
2046 client_obd_list_unlock(&cli->cl_loi_list_lock);
2048 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2049 page_count, aa, cli->cl_r_in_flight,
2050 cli->cl_w_in_flight);
2052 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2053 * see which CPU/NUMA node the majority of pages were allocated
2054 * on, and try to assign the async RPC to the CPU core
2055 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2057 * But on the other hand, we expect that multiple ptlrpcd
2058 * threads and the initial write sponsor can run in parallel,
2059 * especially when data checksum is enabled, which is CPU-bound
2060 * operation and single ptlrpcd thread cannot process in time.
2061 * So more ptlrpcd threads sharing BRW load
2062 * (with PDL_POLICY_ROUND) seems better.
2064 ptlrpcd_add_req(req, pol, -1);
2070 cfs_memory_pressure_restore(mpflag);
2072 if (crattr != NULL) {
2073 capa_put(crattr->cra_capa);
2074 OBD_FREE(crattr, sizeof(*crattr));
2078 LASSERT(req == NULL);
2083 OBD_FREE(pga, sizeof(*pga) * page_count);
2084 /* this should happen rarely and is pretty bad, it makes the
2085 * pending list not follow the dirty order */
2086 while (!list_empty(ext_list)) {
2087 ext = list_entry(ext_list->next, struct osc_extent,
2089 list_del_init(&ext->oe_link);
2090 osc_extent_finish(env, ext, 0, rc);
2092 if (clerq && !IS_ERR(clerq))
2093 cl_req_completion(env, clerq, rc);
2098 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2099 struct ldlm_enqueue_info *einfo)
2101 void *data = einfo->ei_cbdata;
2104 LASSERT(lock != NULL);
2105 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2106 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2107 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2108 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2110 lock_res_and_lock(lock);
2111 spin_lock(&osc_ast_guard);
2113 if (lock->l_ast_data == NULL)
2114 lock->l_ast_data = data;
2115 if (lock->l_ast_data == data)
2118 spin_unlock(&osc_ast_guard);
2119 unlock_res_and_lock(lock);
2124 static int osc_set_data_with_check(struct lustre_handle *lockh,
2125 struct ldlm_enqueue_info *einfo)
2127 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2131 set = osc_set_lock_data_with_check(lock, einfo);
2132 LDLM_LOCK_PUT(lock);
2134 CERROR("lockh %p, data %p - client evicted?\n",
2135 lockh, einfo->ei_cbdata);
2139 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2140 ldlm_iterator_t replace, void *data)
2142 struct ldlm_res_id res_id;
2143 struct obd_device *obd = class_exp2obd(exp);
2145 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2146 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2150 /* find any ldlm lock of the inode in osc
2154 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2155 ldlm_iterator_t replace, void *data)
2157 struct ldlm_res_id res_id;
2158 struct obd_device *obd = class_exp2obd(exp);
2161 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2162 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2163 if (rc == LDLM_ITER_STOP)
2165 if (rc == LDLM_ITER_CONTINUE)
2170 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2171 obd_enqueue_update_f upcall, void *cookie,
2172 __u64 *flags, int agl, int rc)
2174 int intent = *flags & LDLM_FL_HAS_INTENT;
2178 /* The request was created before ldlm_cli_enqueue call. */
2179 if (rc == ELDLM_LOCK_ABORTED) {
2180 struct ldlm_reply *rep;
2181 rep = req_capsule_server_get(&req->rq_pill,
2184 LASSERT(rep != NULL);
2185 rep->lock_policy_res1 =
2186 ptlrpc_status_ntoh(rep->lock_policy_res1);
2187 if (rep->lock_policy_res1)
2188 rc = rep->lock_policy_res1;
2192 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2194 *flags |= LDLM_FL_LVB_READY;
2195 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2196 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2199 /* Call the update callback. */
2200 rc = (*upcall)(cookie, rc);
2204 static int osc_enqueue_interpret(const struct lu_env *env,
2205 struct ptlrpc_request *req,
2206 struct osc_enqueue_args *aa, int rc)
2208 struct ldlm_lock *lock;
2209 struct lustre_handle handle;
2211 struct ost_lvb *lvb;
2213 __u64 *flags = aa->oa_flags;
2215 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2216 * might be freed anytime after lock upcall has been called. */
2217 lustre_handle_copy(&handle, aa->oa_lockh);
2218 mode = aa->oa_ei->ei_mode;
2220 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2222 lock = ldlm_handle2lock(&handle);
2224 /* Take an additional reference so that a blocking AST that
2225 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2226 * to arrive after an upcall has been executed by
2227 * osc_enqueue_fini(). */
2228 ldlm_lock_addref(&handle, mode);
2230 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2231 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2233 /* Let CP AST to grant the lock first. */
2234 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2236 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2241 lvb_len = sizeof(*aa->oa_lvb);
2244 /* Complete obtaining the lock procedure. */
2245 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2246 mode, flags, lvb, lvb_len, &handle, rc);
2247 /* Complete osc stuff. */
2248 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2249 flags, aa->oa_agl, rc);
2251 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2253 /* Release the lock for async request. */
2254 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2256 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2257 * not already released by
2258 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2260 ldlm_lock_decref(&handle, mode);
2262 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2263 aa->oa_lockh, req, aa);
2264 ldlm_lock_decref(&handle, mode);
2265 LDLM_LOCK_PUT(lock);
2269 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2271 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2272 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2273 * other synchronous requests, however keeping some locks and trying to obtain
2274 * others may take a considerable amount of time in a case of ost failure; and
2275 * when other sync requests do not get released lock from a client, the client
2276 * is excluded from the cluster -- such scenarious make the life difficult, so
2277 * release locks just after they are obtained. */
2278 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2279 __u64 *flags, ldlm_policy_data_t *policy,
2280 struct ost_lvb *lvb, int kms_valid,
2281 obd_enqueue_update_f upcall, void *cookie,
2282 struct ldlm_enqueue_info *einfo,
2283 struct lustre_handle *lockh,
2284 struct ptlrpc_request_set *rqset, int async, int agl)
2286 struct obd_device *obd = exp->exp_obd;
2287 struct ptlrpc_request *req = NULL;
2288 int intent = *flags & LDLM_FL_HAS_INTENT;
2289 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2294 /* Filesystem lock extents are extended to page boundaries so that
2295 * dealing with the page cache is a little smoother. */
2296 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2297 policy->l_extent.end |= ~CFS_PAGE_MASK;
2300 * kms is not valid when either object is completely fresh (so that no
2301 * locks are cached), or object was evicted. In the latter case cached
2302 * lock cannot be used, because it would prime inode state with
2303 * potentially stale LVB.
2308 /* Next, search for already existing extent locks that will cover us */
2309 /* If we're trying to read, we also search for an existing PW lock. The
2310 * VFS and page cache already protect us locally, so lots of readers/
2311 * writers can share a single PW lock.
2313 * There are problems with conversion deadlocks, so instead of
2314 * converting a read lock to a write lock, we'll just enqueue a new
2317 * At some point we should cancel the read lock instead of making them
2318 * send us a blocking callback, but there are problems with canceling
2319 * locks out from other users right now, too. */
2320 mode = einfo->ei_mode;
2321 if (einfo->ei_mode == LCK_PR)
2323 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2324 einfo->ei_type, policy, mode, lockh, 0);
2326 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2328 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2329 /* For AGL, if enqueue RPC is sent but the lock is not
2330 * granted, then skip to process this strpe.
2331 * Return -ECANCELED to tell the caller. */
2332 ldlm_lock_decref(lockh, mode);
2333 LDLM_LOCK_PUT(matched);
2335 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2336 *flags |= LDLM_FL_LVB_READY;
2337 /* addref the lock only if not async requests and PW
2338 * lock is matched whereas we asked for PR. */
2339 if (!rqset && einfo->ei_mode != mode)
2340 ldlm_lock_addref(lockh, LCK_PR);
2342 /* I would like to be able to ASSERT here that
2343 * rss <= kms, but I can't, for reasons which
2344 * are explained in lov_enqueue() */
2347 /* We already have a lock, and it's referenced.
2349 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2350 * AGL upcall may change it to CLS_HELD directly. */
2351 (*upcall)(cookie, ELDLM_OK);
2353 if (einfo->ei_mode != mode)
2354 ldlm_lock_decref(lockh, LCK_PW);
2356 /* For async requests, decref the lock. */
2357 ldlm_lock_decref(lockh, einfo->ei_mode);
2358 LDLM_LOCK_PUT(matched);
2361 ldlm_lock_decref(lockh, mode);
2362 LDLM_LOCK_PUT(matched);
2368 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2369 &RQF_LDLM_ENQUEUE_LVB);
2373 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2375 ptlrpc_request_free(req);
2379 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2381 ptlrpc_request_set_replen(req);
2384 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2385 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2387 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2388 sizeof(*lvb), LVB_T_OST, lockh, async);
2391 struct osc_enqueue_args *aa;
2392 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2393 aa = ptlrpc_req_async_args(req);
2396 aa->oa_flags = flags;
2397 aa->oa_upcall = upcall;
2398 aa->oa_cookie = cookie;
2400 aa->oa_lockh = lockh;
2403 req->rq_interpret_reply =
2404 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2405 if (rqset == PTLRPCD_SET)
2406 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2408 ptlrpc_set_add_req(rqset, req);
2409 } else if (intent) {
2410 ptlrpc_req_finished(req);
2415 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2417 ptlrpc_req_finished(req);
2422 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2423 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2424 __u64 *flags, void *data, struct lustre_handle *lockh,
2427 struct obd_device *obd = exp->exp_obd;
2428 __u64 lflags = *flags;
2432 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2435 /* Filesystem lock extents are extended to page boundaries so that
2436 * dealing with the page cache is a little smoother */
2437 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2438 policy->l_extent.end |= ~CFS_PAGE_MASK;
2440 /* Next, search for already existing extent locks that will cover us */
2441 /* If we're trying to read, we also search for an existing PW lock. The
2442 * VFS and page cache already protect us locally, so lots of readers/
2443 * writers can share a single PW lock. */
2447 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2448 res_id, type, policy, rc, lockh, unref);
2451 if (!osc_set_data_with_check(lockh, data)) {
2452 if (!(lflags & LDLM_FL_TEST_LOCK))
2453 ldlm_lock_decref(lockh, rc);
2457 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2458 ldlm_lock_addref(lockh, LCK_PR);
2459 ldlm_lock_decref(lockh, LCK_PW);
2466 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2470 if (unlikely(mode == LCK_GROUP))
2471 ldlm_lock_decref_and_cancel(lockh, mode);
2473 ldlm_lock_decref(lockh, mode);
2478 static int osc_statfs_interpret(const struct lu_env *env,
2479 struct ptlrpc_request *req,
2480 struct osc_async_args *aa, int rc)
2482 struct obd_statfs *msfs;
2486 /* The request has in fact never been sent
2487 * due to issues at a higher level (LOV).
2488 * Exit immediately since the caller is
2489 * aware of the problem and takes care
2490 * of the clean up */
2493 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2494 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2500 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2502 GOTO(out, rc = -EPROTO);
2505 *aa->aa_oi->oi_osfs = *msfs;
2507 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2511 static int osc_statfs_async(struct obd_export *exp,
2512 struct obd_info *oinfo, __u64 max_age,
2513 struct ptlrpc_request_set *rqset)
2515 struct obd_device *obd = class_exp2obd(exp);
2516 struct ptlrpc_request *req;
2517 struct osc_async_args *aa;
2521 /* We could possibly pass max_age in the request (as an absolute
2522 * timestamp or a "seconds.usec ago") so the target can avoid doing
2523 * extra calls into the filesystem if that isn't necessary (e.g.
2524 * during mount that would help a bit). Having relative timestamps
2525 * is not so great if request processing is slow, while absolute
2526 * timestamps are not ideal because they need time synchronization. */
2527 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2531 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2533 ptlrpc_request_free(req);
2536 ptlrpc_request_set_replen(req);
2537 req->rq_request_portal = OST_CREATE_PORTAL;
2538 ptlrpc_at_set_req_timeout(req);
2540 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2541 /* procfs requests not want stat in wait for avoid deadlock */
2542 req->rq_no_resend = 1;
2543 req->rq_no_delay = 1;
2546 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2547 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2548 aa = ptlrpc_req_async_args(req);
2551 ptlrpc_set_add_req(rqset, req);
2555 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2556 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2558 struct obd_device *obd = class_exp2obd(exp);
2559 struct obd_statfs *msfs;
2560 struct ptlrpc_request *req;
2561 struct obd_import *imp = NULL;
2565 /*Since the request might also come from lprocfs, so we need
2566 *sync this with client_disconnect_export Bug15684*/
2567 down_read(&obd->u.cli.cl_sem);
2568 if (obd->u.cli.cl_import)
2569 imp = class_import_get(obd->u.cli.cl_import);
2570 up_read(&obd->u.cli.cl_sem);
2574 /* We could possibly pass max_age in the request (as an absolute
2575 * timestamp or a "seconds.usec ago") so the target can avoid doing
2576 * extra calls into the filesystem if that isn't necessary (e.g.
2577 * during mount that would help a bit). Having relative timestamps
2578 * is not so great if request processing is slow, while absolute
2579 * timestamps are not ideal because they need time synchronization. */
2580 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2582 class_import_put(imp);
2587 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2589 ptlrpc_request_free(req);
2592 ptlrpc_request_set_replen(req);
2593 req->rq_request_portal = OST_CREATE_PORTAL;
2594 ptlrpc_at_set_req_timeout(req);
2596 if (flags & OBD_STATFS_NODELAY) {
2597 /* procfs requests not want stat in wait for avoid deadlock */
2598 req->rq_no_resend = 1;
2599 req->rq_no_delay = 1;
2602 rc = ptlrpc_queue_wait(req);
2606 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2608 GOTO(out, rc = -EPROTO);
2615 ptlrpc_req_finished(req);
2619 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2620 void *karg, void *uarg)
2622 struct obd_device *obd = exp->exp_obd;
2623 struct obd_ioctl_data *data = karg;
2627 if (!try_module_get(THIS_MODULE)) {
2628 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2629 module_name(THIS_MODULE));
2633 case OBD_IOC_CLIENT_RECOVER:
2634 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2635 data->ioc_inlbuf1, 0);
2639 case IOC_OSC_SET_ACTIVE:
2640 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2643 case OBD_IOC_POLL_QUOTACHECK:
2644 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2646 case OBD_IOC_PING_TARGET:
2647 err = ptlrpc_obd_ping(obd);
2650 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2651 cmd, current_comm());
2652 GOTO(out, err = -ENOTTY);
2655 module_put(THIS_MODULE);
2659 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2660 obd_count keylen, void *key, __u32 *vallen, void *val,
2661 struct lov_stripe_md *lsm)
2664 if (!vallen || !val)
2667 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2668 __u32 *stripe = val;
2669 *vallen = sizeof(*stripe);
2672 } else if (KEY_IS(KEY_LAST_ID)) {
2673 struct ptlrpc_request *req;
2678 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2679 &RQF_OST_GET_INFO_LAST_ID);
2683 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2684 RCL_CLIENT, keylen);
2685 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2687 ptlrpc_request_free(req);
2691 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2692 memcpy(tmp, key, keylen);
2694 req->rq_no_delay = req->rq_no_resend = 1;
2695 ptlrpc_request_set_replen(req);
2696 rc = ptlrpc_queue_wait(req);
2700 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2702 GOTO(out, rc = -EPROTO);
2704 *((obd_id *)val) = *reply;
2706 ptlrpc_req_finished(req);
2708 } else if (KEY_IS(KEY_FIEMAP)) {
2709 struct ll_fiemap_info_key *fm_key =
2710 (struct ll_fiemap_info_key *)key;
2711 struct ldlm_res_id res_id;
2712 ldlm_policy_data_t policy;
2713 struct lustre_handle lockh;
2714 ldlm_mode_t mode = 0;
2715 struct ptlrpc_request *req;
2716 struct ll_user_fiemap *reply;
2720 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2723 policy.l_extent.start = fm_key->fiemap.fm_start &
2726 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2727 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2728 policy.l_extent.end = OBD_OBJECT_EOF;
2730 policy.l_extent.end = (fm_key->fiemap.fm_start +
2731 fm_key->fiemap.fm_length +
2732 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2734 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2735 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2736 LDLM_FL_BLOCK_GRANTED |
2738 &res_id, LDLM_EXTENT, &policy,
2739 LCK_PR | LCK_PW, &lockh, 0);
2740 if (mode) { /* lock is cached on client */
2741 if (mode != LCK_PR) {
2742 ldlm_lock_addref(&lockh, LCK_PR);
2743 ldlm_lock_decref(&lockh, LCK_PW);
2745 } else { /* no cached lock, needs acquire lock on server side */
2746 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2747 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2751 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2752 &RQF_OST_GET_INFO_FIEMAP);
2754 GOTO(drop_lock, rc = -ENOMEM);
2756 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2757 RCL_CLIENT, keylen);
2758 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2759 RCL_CLIENT, *vallen);
2760 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2761 RCL_SERVER, *vallen);
2763 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2765 ptlrpc_request_free(req);
2766 GOTO(drop_lock, rc);
2769 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2770 memcpy(tmp, key, keylen);
2771 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2772 memcpy(tmp, val, *vallen);
2774 ptlrpc_request_set_replen(req);
2775 rc = ptlrpc_queue_wait(req);
2779 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2781 GOTO(fini_req, rc = -EPROTO);
2783 memcpy(val, reply, *vallen);
2785 ptlrpc_req_finished(req);
2788 ldlm_lock_decref(&lockh, LCK_PR);
2795 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2796 obd_count keylen, void *key, obd_count vallen,
2797 void *val, struct ptlrpc_request_set *set)
2799 struct ptlrpc_request *req;
2800 struct obd_device *obd = exp->exp_obd;
2801 struct obd_import *imp = class_exp2cliimp(exp);
2806 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2808 if (KEY_IS(KEY_CHECKSUM)) {
2809 if (vallen != sizeof(int))
2811 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2815 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2816 sptlrpc_conf_client_adapt(obd);
2820 if (KEY_IS(KEY_FLUSH_CTX)) {
2821 sptlrpc_import_flush_my_ctx(imp);
2825 if (KEY_IS(KEY_CACHE_SET)) {
2826 struct client_obd *cli = &obd->u.cli;
2828 LASSERT(cli->cl_cache == NULL); /* only once */
2829 cli->cl_cache = (struct cl_client_cache *)val;
2830 atomic_inc(&cli->cl_cache->ccc_users);
2831 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2833 /* add this osc into entity list */
2834 LASSERT(list_empty(&cli->cl_lru_osc));
2835 spin_lock(&cli->cl_cache->ccc_lru_lock);
2836 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2837 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2842 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2843 struct client_obd *cli = &obd->u.cli;
2844 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2845 int target = *(int *)val;
2847 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2852 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2855 /* We pass all other commands directly to OST. Since nobody calls osc
2856 methods directly and everybody is supposed to go through LOV, we
2857 assume lov checked invalid values for us.
2858 The only recognised values so far are evict_by_nid and mds_conn.
2859 Even if something bad goes through, we'd get a -EINVAL from OST
2862 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2863 &RQF_OST_SET_GRANT_INFO :
2868 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2869 RCL_CLIENT, keylen);
2870 if (!KEY_IS(KEY_GRANT_SHRINK))
2871 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2872 RCL_CLIENT, vallen);
2873 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2875 ptlrpc_request_free(req);
2879 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2880 memcpy(tmp, key, keylen);
2881 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2884 memcpy(tmp, val, vallen);
2886 if (KEY_IS(KEY_GRANT_SHRINK)) {
2887 struct osc_grant_args *aa;
2890 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2891 aa = ptlrpc_req_async_args(req);
2894 ptlrpc_req_finished(req);
2897 *oa = ((struct ost_body *)val)->oa;
2899 req->rq_interpret_reply = osc_shrink_grant_interpret;
2902 ptlrpc_request_set_replen(req);
2903 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2904 LASSERT(set != NULL);
2905 ptlrpc_set_add_req(set, req);
2906 ptlrpc_check_set(NULL, set);
2908 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2913 static int osc_reconnect(const struct lu_env *env,
2914 struct obd_export *exp, struct obd_device *obd,
2915 struct obd_uuid *cluuid,
2916 struct obd_connect_data *data,
2919 struct client_obd *cli = &obd->u.cli;
2921 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2924 client_obd_list_lock(&cli->cl_loi_list_lock);
2925 data->ocd_grant = (cli->cl_avail_grant +
2926 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2927 2 * cli_brw_size(obd);
2928 lost_grant = cli->cl_lost_grant;
2929 cli->cl_lost_grant = 0;
2930 client_obd_list_unlock(&cli->cl_loi_list_lock);
2932 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2933 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2934 data->ocd_version, data->ocd_grant, lost_grant);
2940 static int osc_disconnect(struct obd_export *exp)
2942 struct obd_device *obd = class_exp2obd(exp);
2943 struct llog_ctxt *ctxt;
2946 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2948 if (obd->u.cli.cl_conn_count == 1) {
2949 /* Flush any remaining cancel messages out to the
2951 llog_sync(ctxt, exp, 0);
2953 llog_ctxt_put(ctxt);
2955 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2959 rc = client_disconnect_export(exp);
2961 * Initially we put del_shrink_grant before disconnect_export, but it
2962 * causes the following problem if setup (connect) and cleanup
2963 * (disconnect) are tangled together.
2964 * connect p1 disconnect p2
2965 * ptlrpc_connect_import
2966 * ............... class_manual_cleanup
2969 * ptlrpc_connect_interrupt
2971 * add this client to shrink list
2973 * Bang! pinger trigger the shrink.
2974 * So the osc should be disconnected from the shrink list, after we
2975 * are sure the import has been destroyed. BUG18662
2977 if (obd->u.cli.cl_import == NULL)
2978 osc_del_shrink_grant(&obd->u.cli);
2982 static int osc_import_event(struct obd_device *obd,
2983 struct obd_import *imp,
2984 enum obd_import_event event)
2986 struct client_obd *cli;
2990 LASSERT(imp->imp_obd == obd);
2993 case IMP_EVENT_DISCON: {
2995 client_obd_list_lock(&cli->cl_loi_list_lock);
2996 cli->cl_avail_grant = 0;
2997 cli->cl_lost_grant = 0;
2998 client_obd_list_unlock(&cli->cl_loi_list_lock);
3001 case IMP_EVENT_INACTIVE: {
3002 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3005 case IMP_EVENT_INVALIDATE: {
3006 struct ldlm_namespace *ns = obd->obd_namespace;
3010 env = cl_env_get(&refcheck);
3014 /* all pages go to failing rpcs due to the invalid
3016 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3018 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3019 cl_env_put(env, &refcheck);
3024 case IMP_EVENT_ACTIVE: {
3025 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3028 case IMP_EVENT_OCD: {
3029 struct obd_connect_data *ocd = &imp->imp_connect_data;
3031 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3032 osc_init_grant(&obd->u.cli, ocd);
3035 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3036 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3038 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3041 case IMP_EVENT_DEACTIVATE: {
3042 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3045 case IMP_EVENT_ACTIVATE: {
3046 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3050 CERROR("Unknown import event %d\n", event);
3057 * Determine whether the lock can be canceled before replaying the lock
3058 * during recovery, see bug16774 for detailed information.
3060 * \retval zero the lock can't be canceled
3061 * \retval other ok to cancel
3063 static int osc_cancel_weight(struct ldlm_lock *lock)
3066 * Cancel all unused and granted extent lock.
3068 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3069 lock->l_granted_mode == lock->l_req_mode &&
3070 osc_ldlm_weigh_ast(lock) == 0)
3076 static int brw_queue_work(const struct lu_env *env, void *data)
3078 struct client_obd *cli = data;
3080 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3082 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3086 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3088 struct client_obd *cli = &obd->u.cli;
3089 struct obd_type *type;
3094 rc = ptlrpcd_addref();
3098 rc = client_obd_setup(obd, lcfg);
3100 GOTO(out_ptlrpcd, rc);
3102 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3103 if (IS_ERR(handler))
3104 GOTO(out_client_setup, rc = PTR_ERR(handler));
3105 cli->cl_writeback_work = handler;
3107 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3108 if (IS_ERR(handler))
3109 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3110 cli->cl_lru_work = handler;
3112 rc = osc_quota_setup(obd);
3114 GOTO(out_ptlrpcd_work, rc);
3116 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3119 obd->obd_vars = lprocfs_osc_obd_vars;
3121 /* If this is true then both client (osc) and server (osp) are on the
3122 * same node. The osp layer if loaded first will register the osc proc
3123 * directory. In that case this obd_device will be attached its proc
3124 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3125 type = class_search_type(LUSTRE_OSP_NAME);
3126 if (type && type->typ_procsym) {
3127 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3129 obd->obd_vars, obd);
3130 if (IS_ERR(obd->obd_proc_entry)) {
3131 rc = PTR_ERR(obd->obd_proc_entry);
3132 CERROR("error %d setting up lprocfs for %s\n", rc,
3134 obd->obd_proc_entry = NULL;
3137 rc = lprocfs_seq_obd_setup(obd);
3140 /* If the basic OSC proc tree construction succeeded then
3141 * lets do the rest. */
3143 lproc_osc_attach_seqstat(obd);
3144 sptlrpc_lprocfs_cliobd_attach(obd);
3145 ptlrpc_lprocfs_register_obd(obd);
3148 /* We need to allocate a few requests more, because
3149 * brw_interpret tries to create new requests before freeing
3150 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3151 * reserved, but I'm afraid that might be too much wasted RAM
3152 * in fact, so 2 is just my guess and still should work. */
3153 cli->cl_import->imp_rq_pool =
3154 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3156 ptlrpc_add_rqs_to_pool);
3158 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3159 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3163 if (cli->cl_writeback_work != NULL) {
3164 ptlrpcd_destroy_work(cli->cl_writeback_work);
3165 cli->cl_writeback_work = NULL;
3167 if (cli->cl_lru_work != NULL) {
3168 ptlrpcd_destroy_work(cli->cl_lru_work);
3169 cli->cl_lru_work = NULL;
3172 client_obd_cleanup(obd);
3178 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3184 case OBD_CLEANUP_EARLY: {
3185 struct obd_import *imp;
3186 imp = obd->u.cli.cl_import;
3187 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3188 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3189 ptlrpc_deactivate_import(imp);
3190 spin_lock(&imp->imp_lock);
3191 imp->imp_pingable = 0;
3192 spin_unlock(&imp->imp_lock);
3195 case OBD_CLEANUP_EXPORTS: {
3196 struct client_obd *cli = &obd->u.cli;
3198 * for echo client, export may be on zombie list, wait for
3199 * zombie thread to cull it, because cli.cl_import will be
3200 * cleared in client_disconnect_export():
3201 * class_export_destroy() -> obd_cleanup() ->
3202 * echo_device_free() -> echo_client_cleanup() ->
3203 * obd_disconnect() -> osc_disconnect() ->
3204 * client_disconnect_export()
3206 obd_zombie_barrier();
3207 if (cli->cl_writeback_work) {
3208 ptlrpcd_destroy_work(cli->cl_writeback_work);
3209 cli->cl_writeback_work = NULL;
3211 if (cli->cl_lru_work) {
3212 ptlrpcd_destroy_work(cli->cl_lru_work);
3213 cli->cl_lru_work = NULL;
3215 obd_cleanup_client_import(obd);
3216 ptlrpc_lprocfs_unregister_obd(obd);
3217 lprocfs_obd_cleanup(obd);
3218 rc = obd_llog_finish(obd, 0);
3220 CERROR("failed to cleanup llogging subsystems\n");
3227 int osc_cleanup(struct obd_device *obd)
3229 struct client_obd *cli = &obd->u.cli;
3235 if (cli->cl_cache != NULL) {
3236 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3237 spin_lock(&cli->cl_cache->ccc_lru_lock);
3238 list_del_init(&cli->cl_lru_osc);
3239 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3240 cli->cl_lru_left = NULL;
3241 atomic_dec(&cli->cl_cache->ccc_users);
3242 cli->cl_cache = NULL;
3245 /* free memory of osc quota cache */
3246 osc_quota_cleanup(obd);
3248 rc = client_obd_cleanup(obd);
3254 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3256 int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
3258 return rc > 0 ? 0: rc;
3261 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3263 return osc_process_config_base(obd, buf);
3266 struct obd_ops osc_obd_ops = {
3267 .o_owner = THIS_MODULE,
3268 .o_setup = osc_setup,
3269 .o_precleanup = osc_precleanup,
3270 .o_cleanup = osc_cleanup,
3271 .o_add_conn = client_import_add_conn,
3272 .o_del_conn = client_import_del_conn,
3273 .o_connect = client_connect_import,
3274 .o_reconnect = osc_reconnect,
3275 .o_disconnect = osc_disconnect,
3276 .o_statfs = osc_statfs,
3277 .o_statfs_async = osc_statfs_async,
3278 .o_unpackmd = osc_unpackmd,
3279 .o_create = osc_create,
3280 .o_destroy = osc_destroy,
3281 .o_getattr = osc_getattr,
3282 .o_getattr_async = osc_getattr_async,
3283 .o_setattr = osc_setattr,
3284 .o_setattr_async = osc_setattr_async,
3285 .o_change_cbdata = osc_change_cbdata,
3286 .o_find_cbdata = osc_find_cbdata,
3287 .o_iocontrol = osc_iocontrol,
3288 .o_get_info = osc_get_info,
3289 .o_set_info_async = osc_set_info_async,
3290 .o_import_event = osc_import_event,
3291 .o_process_config = osc_process_config,
3292 .o_quotactl = osc_quotactl,
3293 .o_quotacheck = osc_quotacheck,
3296 extern struct lu_kmem_descr osc_caches[];
3297 extern spinlock_t osc_ast_guard;
3298 extern struct lock_class_key osc_ast_guard_class;
3300 int __init osc_init(void)
3302 bool enable_proc = true;
3303 struct obd_type *type;
3307 /* print an address of _any_ initialized kernel symbol from this
3308 * module, to allow debugging with gdb that doesn't support data
3309 * symbols from modules.*/
3310 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3312 rc = lu_kmem_init(osc_caches);
3316 type = class_search_type(LUSTRE_OSP_NAME);
3317 if (type != NULL && type->typ_procsym != NULL)
3318 enable_proc = false;
3320 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3321 #ifndef HAVE_ONLY_PROCFS_SEQ
3324 LUSTRE_OSC_NAME, &osc_device_type);
3326 lu_kmem_fini(osc_caches);
3330 spin_lock_init(&osc_ast_guard);
3331 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3337 static void /*__exit*/ osc_exit(void)
3339 class_unregister_type(LUSTRE_OSC_NAME);
3340 lu_kmem_fini(osc_caches);
3343 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3344 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3345 MODULE_LICENSE("GPL");
3347 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);