4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <lustre_ha.h>
47 #include <lprocfs_status.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_log.h>
50 #include <lustre_debug.h>
51 #include <lustre_param.h>
52 #include <lustre_fid.h>
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
56 struct osc_brw_async_args {
60 obd_count aa_page_count;
62 struct brw_page **aa_ppga;
63 struct client_obd *aa_cli;
64 struct list_head aa_oaps;
65 struct list_head aa_exts;
66 struct obd_capa *aa_ocapa;
67 struct cl_req *aa_clerq;
70 #define osc_grant_args osc_brw_async_args
72 struct osc_async_args {
73 struct obd_info *aa_oi;
76 struct osc_setattr_args {
78 obd_enqueue_update_f sa_upcall;
82 struct osc_fsync_args {
83 struct obd_info *fa_oi;
84 obd_enqueue_update_f fa_upcall;
88 struct osc_enqueue_args {
89 struct obd_export *oa_exp;
93 __u64 oa_flags_internal;
94 osc_enqueue_upcall_f oa_upcall;
96 struct ost_lvb *oa_lvb;
97 struct lustre_handle oa_lockh;
98 unsigned int oa_agl:1;
101 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
102 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
105 /* Unpack OSC object metadata from disk storage (LE byte order). */
106 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
107 struct lov_mds_md *lmm, int lmm_bytes)
110 struct obd_import *imp = class_exp2cliimp(exp);
114 if (lmm_bytes < sizeof(*lmm)) {
115 CERROR("%s: lov_mds_md too small: %d, need %d\n",
116 exp->exp_obd->obd_name, lmm_bytes,
120 /* XXX LOV_MAGIC etc check? */
122 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
123 CERROR("%s: zero lmm_object_id: rc = %d\n",
124 exp->exp_obd->obd_name, -EINVAL);
129 lsm_size = lov_stripe_md_size(1);
133 if (*lsmp != NULL && lmm == NULL) {
134 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
135 OBD_FREE(*lsmp, lsm_size);
141 OBD_ALLOC(*lsmp, lsm_size);
142 if (unlikely(*lsmp == NULL))
144 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
145 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
146 OBD_FREE(*lsmp, lsm_size);
149 loi_init((*lsmp)->lsm_oinfo[0]);
150 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
155 /* XXX zero *lsmp? */
156 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
159 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
160 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
162 (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
167 static inline void osc_pack_capa(struct ptlrpc_request *req,
168 struct ost_body *body, void *capa)
170 struct obd_capa *oc = (struct obd_capa *)capa;
171 struct lustre_capa *c;
176 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
179 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
180 DEBUG_CAPA(D_SEC, c, "pack");
183 static inline void osc_pack_req_body(struct ptlrpc_request *req,
184 struct obd_info *oinfo)
186 struct ost_body *body;
188 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
191 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
193 osc_pack_capa(req, body, oinfo->oi_capa);
196 static inline void osc_set_capa_size(struct ptlrpc_request *req,
197 const struct req_msg_field *field,
201 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
203 /* it is already calculated as sizeof struct obd_capa */
207 static int osc_getattr_interpret(const struct lu_env *env,
208 struct ptlrpc_request *req,
209 struct osc_async_args *aa, int rc)
211 struct ost_body *body;
217 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
219 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
220 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
221 aa->aa_oi->oi_oa, &body->oa);
223 /* This should really be sent by the OST */
224 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
225 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
227 CDEBUG(D_INFO, "can't unpack ost_body\n");
229 aa->aa_oi->oi_oa->o_valid = 0;
232 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
236 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
237 struct ptlrpc_request_set *set)
239 struct ptlrpc_request *req;
240 struct osc_async_args *aa;
244 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
248 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
249 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
251 ptlrpc_request_free(req);
255 osc_pack_req_body(req, oinfo);
257 ptlrpc_request_set_replen(req);
258 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
260 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
261 aa = ptlrpc_req_async_args(req);
264 ptlrpc_set_add_req(set, req);
268 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
269 struct obd_info *oinfo)
271 struct ptlrpc_request *req;
272 struct ost_body *body;
276 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
280 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
281 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
283 ptlrpc_request_free(req);
287 osc_pack_req_body(req, oinfo);
289 ptlrpc_request_set_replen(req);
291 rc = ptlrpc_queue_wait(req);
295 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
297 GOTO(out, rc = -EPROTO);
299 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
300 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
303 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
304 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
308 ptlrpc_req_finished(req);
312 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
313 struct obd_info *oinfo, struct obd_trans_info *oti)
315 struct ptlrpc_request *req;
316 struct ost_body *body;
320 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
322 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
326 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
327 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
329 ptlrpc_request_free(req);
333 osc_pack_req_body(req, oinfo);
335 ptlrpc_request_set_replen(req);
337 rc = ptlrpc_queue_wait(req);
341 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
343 GOTO(out, rc = -EPROTO);
345 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
350 ptlrpc_req_finished(req);
354 static int osc_setattr_interpret(const struct lu_env *env,
355 struct ptlrpc_request *req,
356 struct osc_setattr_args *sa, int rc)
358 struct ost_body *body;
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366 GOTO(out, rc = -EPROTO);
368 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
371 rc = sa->sa_upcall(sa->sa_cookie, rc);
375 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
376 struct obd_trans_info *oti,
377 obd_enqueue_update_f upcall, void *cookie,
378 struct ptlrpc_request_set *rqset)
380 struct ptlrpc_request *req;
381 struct osc_setattr_args *sa;
385 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
389 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
390 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
392 ptlrpc_request_free(req);
396 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
399 osc_pack_req_body(req, oinfo);
401 ptlrpc_request_set_replen(req);
403 /* do mds to ost setattr asynchronously */
405 /* Do not wait for response. */
406 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
408 req->rq_interpret_reply =
409 (ptlrpc_interpterer_t)osc_setattr_interpret;
411 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
412 sa = ptlrpc_req_async_args(req);
413 sa->sa_oa = oinfo->oi_oa;
414 sa->sa_upcall = upcall;
415 sa->sa_cookie = cookie;
417 if (rqset == PTLRPCD_SET)
418 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
420 ptlrpc_set_add_req(rqset, req);
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427 struct obd_trans_info *oti,
428 struct ptlrpc_request_set *rqset)
430 return osc_setattr_async_base(exp, oinfo, oti,
431 oinfo->oi_cb_up, oinfo, rqset);
434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435 struct lov_stripe_md **ea, struct obd_trans_info *oti)
437 struct ptlrpc_request *req;
438 struct ost_body *body;
439 struct lov_stripe_md *lsm;
448 rc = obd_alloc_memmd(exp, &lsm);
453 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
455 GOTO(out, rc = -ENOMEM);
457 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
459 ptlrpc_request_free(req);
463 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
466 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
468 ptlrpc_request_set_replen(req);
470 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
471 oa->o_flags == OBD_FL_DELORPHAN) {
473 "delorphan from OST integration");
474 /* Don't resend the delorphan req */
475 req->rq_no_resend = req->rq_no_delay = 1;
478 rc = ptlrpc_queue_wait(req);
482 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
484 GOTO(out_req, rc = -EPROTO);
486 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
487 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
489 oa->o_blksize = cli_brw_size(exp->exp_obd);
490 oa->o_valid |= OBD_MD_FLBLKSZ;
492 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
493 * have valid lsm_oinfo data structs, so don't go touching that.
494 * This needs to be fixed in a big way.
496 lsm->lsm_oi = oa->o_oi;
500 if (oa->o_valid & OBD_MD_FLCOOKIE) {
501 if (oti->oti_logcookies == NULL)
502 oti->oti_logcookies = &oti->oti_onecookie;
504 *oti->oti_logcookies = oa->o_lcookie;
508 CDEBUG(D_HA, "transno: "LPD64"\n",
509 lustre_msg_get_transno(req->rq_repmsg));
511 ptlrpc_req_finished(req);
514 obd_free_memmd(exp, &lsm);
518 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
519 obd_enqueue_update_f upcall, void *cookie,
520 struct ptlrpc_request_set *rqset)
522 struct ptlrpc_request *req;
523 struct osc_setattr_args *sa;
524 struct ost_body *body;
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
533 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535 ptlrpc_request_free(req);
538 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req);
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
545 osc_pack_capa(req, body, oinfo->oi_capa);
547 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
550 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
551 sa = ptlrpc_req_async_args(req);
552 sa->sa_oa = oinfo->oi_oa;
553 sa->sa_upcall = upcall;
554 sa->sa_cookie = cookie;
555 if (rqset == PTLRPCD_SET)
556 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
558 ptlrpc_set_add_req(rqset, req);
563 static int osc_sync_interpret(const struct lu_env *env,
564 struct ptlrpc_request *req,
567 struct osc_fsync_args *fa = arg;
568 struct ost_body *body;
574 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
576 CERROR ("can't unpack ost_body\n");
577 GOTO(out, rc = -EPROTO);
580 *fa->fa_oi->oi_oa = body->oa;
582 rc = fa->fa_upcall(fa->fa_cookie, rc);
586 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
587 obd_enqueue_update_f upcall, void *cookie,
588 struct ptlrpc_request_set *rqset)
590 struct ptlrpc_request *req;
591 struct ost_body *body;
592 struct osc_fsync_args *fa;
596 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
600 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
601 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
603 ptlrpc_request_free(req);
607 /* overload the size and blocks fields in the oa with start/end */
608 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
610 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
612 osc_pack_capa(req, body, oinfo->oi_capa);
614 ptlrpc_request_set_replen(req);
615 req->rq_interpret_reply = osc_sync_interpret;
617 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
618 fa = ptlrpc_req_async_args(req);
620 fa->fa_upcall = upcall;
621 fa->fa_cookie = cookie;
623 if (rqset == PTLRPCD_SET)
624 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
626 ptlrpc_set_add_req(rqset, req);
631 /* Find and cancel locally locks matched by @mode in the resource found by
632 * @objid. Found locks are added into @cancel list. Returns the amount of
633 * locks added to @cancels list. */
634 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
635 struct list_head *cancels,
636 ldlm_mode_t mode, __u64 lock_flags)
638 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
639 struct ldlm_res_id res_id;
640 struct ldlm_resource *res;
644 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
645 * export) but disabled through procfs (flag in NS).
647 * This distinguishes from a case when ELC is not supported originally,
648 * when we still want to cancel locks in advance and just cancel them
649 * locally, without sending any RPC. */
650 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
653 ostid_build_res_name(&oa->o_oi, &res_id);
654 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
658 LDLM_RESOURCE_ADDREF(res);
659 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
660 lock_flags, 0, NULL);
661 LDLM_RESOURCE_DELREF(res);
662 ldlm_resource_putref(res);
666 static int osc_destroy_interpret(const struct lu_env *env,
667 struct ptlrpc_request *req, void *data,
670 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672 atomic_dec(&cli->cl_destroy_in_flight);
673 wake_up(&cli->cl_destroy_waitq);
677 static int osc_can_send_destroy(struct client_obd *cli)
679 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
680 cli->cl_max_rpcs_in_flight) {
681 /* The destroy request can be sent */
684 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
685 cli->cl_max_rpcs_in_flight) {
687 * The counter has been modified between the two atomic
690 wake_up(&cli->cl_destroy_waitq);
695 int osc_create(const struct lu_env *env, struct obd_export *exp,
696 struct obdo *oa, struct lov_stripe_md **ea,
697 struct obd_trans_info *oti)
704 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
706 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
707 oa->o_flags == OBD_FL_RECREATE_OBJS) {
708 RETURN(osc_real_create(exp, oa, ea, oti));
711 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
712 RETURN(osc_real_create(exp, oa, ea, oti));
714 /* we should not get here anymore */
720 /* Destroy requests can be async always on the client, and we don't even really
721 * care about the return code since the client cannot do anything at all about
723 * When the MDS is unlinking a filename, it saves the file objects into a
724 * recovery llog, and these object records are cancelled when the OST reports
725 * they were destroyed and sync'd to disk (i.e. transaction committed).
726 * If the client dies, or the OST is down when the object should be destroyed,
727 * the records are not cancelled, and when the OST reconnects to the MDS next,
728 * it will retrieve the llog unlink logs and then sends the log cancellation
729 * cookies to the MDS after committing destroy transactions. */
730 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
731 struct obdo *oa, struct lov_stripe_md *ea,
732 struct obd_trans_info *oti, struct obd_export *md_export,
735 struct client_obd *cli = &exp->exp_obd->u.cli;
736 struct ptlrpc_request *req;
737 struct ost_body *body;
738 struct list_head cancels = LIST_HEAD_INIT(cancels);
743 CDEBUG(D_INFO, "oa NULL\n");
747 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
748 LDLM_FL_DISCARD_DATA);
750 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
752 ldlm_lock_list_put(&cancels, l_bl_ast, count);
756 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
757 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
760 ptlrpc_request_free(req);
764 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
765 ptlrpc_at_set_req_timeout(req);
767 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
768 oa->o_lcookie = *oti->oti_logcookies;
769 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
771 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
773 osc_pack_capa(req, body, (struct obd_capa *)capa);
774 ptlrpc_request_set_replen(req);
776 /* If osc_destory is for destroying the unlink orphan,
777 * sent from MDT to OST, which should not be blocked here,
778 * because the process might be triggered by ptlrpcd, and
779 * it is not good to block ptlrpcd thread (b=16006)*/
780 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
781 req->rq_interpret_reply = osc_destroy_interpret;
782 if (!osc_can_send_destroy(cli)) {
783 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
787 * Wait until the number of on-going destroy RPCs drops
788 * under max_rpc_in_flight
790 l_wait_event_exclusive(cli->cl_destroy_waitq,
791 osc_can_send_destroy(cli), &lwi);
795 /* Do not wait for response */
796 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
800 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
803 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
805 LASSERT(!(oa->o_valid & bits));
808 client_obd_list_lock(&cli->cl_loi_list_lock);
809 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
810 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
811 cli->cl_dirty_max_pages)) {
812 CERROR("dirty %lu - %lu > dirty_max %lu\n",
813 cli->cl_dirty_pages, cli->cl_dirty_transit,
814 cli->cl_dirty_max_pages);
816 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
817 atomic_long_read(&obd_dirty_transit_pages) >
818 (obd_max_dirty_pages + 1))) {
819 /* The atomic_read() allowing the atomic_inc() are
820 * not covered by a lock thus they may safely race and trip
821 * this CERROR() unless we add in a small fudge factor (+1). */
822 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
823 cli->cl_import->imp_obd->obd_name,
824 atomic_long_read(&obd_dirty_pages),
825 atomic_long_read(&obd_dirty_transit_pages),
826 obd_max_dirty_pages);
828 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
830 CERROR("dirty %lu - dirty_max %lu too big???\n",
831 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
834 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
836 (cli->cl_max_rpcs_in_flight + 1);
837 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
840 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
841 oa->o_dropped = cli->cl_lost_grant;
842 cli->cl_lost_grant = 0;
843 client_obd_list_unlock(&cli->cl_loi_list_lock);
844 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
845 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
849 void osc_update_next_shrink(struct client_obd *cli)
851 cli->cl_next_shrink_grant =
852 cfs_time_shift(cli->cl_grant_shrink_interval);
853 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
854 cli->cl_next_shrink_grant);
857 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
859 client_obd_list_lock(&cli->cl_loi_list_lock);
860 cli->cl_avail_grant += grant;
861 client_obd_list_unlock(&cli->cl_loi_list_lock);
864 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
866 if (body->oa.o_valid & OBD_MD_FLGRANT) {
867 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
868 __osc_update_grant(cli, body->oa.o_grant);
872 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
873 obd_count keylen, void *key, obd_count vallen,
874 void *val, struct ptlrpc_request_set *set);
876 static int osc_shrink_grant_interpret(const struct lu_env *env,
877 struct ptlrpc_request *req,
880 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
881 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
882 struct ost_body *body;
885 __osc_update_grant(cli, oa->o_grant);
889 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
891 osc_update_grant(cli, body);
897 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
899 client_obd_list_lock(&cli->cl_loi_list_lock);
900 oa->o_grant = cli->cl_avail_grant / 4;
901 cli->cl_avail_grant -= oa->o_grant;
902 client_obd_list_unlock(&cli->cl_loi_list_lock);
903 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
904 oa->o_valid |= OBD_MD_FLFLAGS;
907 oa->o_flags |= OBD_FL_SHRINK_GRANT;
908 osc_update_next_shrink(cli);
911 /* Shrink the current grant, either from some large amount to enough for a
912 * full set of in-flight RPCs, or if we have already shrunk to that limit
913 * then to enough for a single RPC. This avoids keeping more grant than
914 * needed, and avoids shrinking the grant piecemeal. */
915 static int osc_shrink_grant(struct client_obd *cli)
917 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
918 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
920 client_obd_list_lock(&cli->cl_loi_list_lock);
921 if (cli->cl_avail_grant <= target_bytes)
922 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
923 client_obd_list_unlock(&cli->cl_loi_list_lock);
925 return osc_shrink_grant_to_target(cli, target_bytes);
928 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
931 struct ost_body *body;
934 client_obd_list_lock(&cli->cl_loi_list_lock);
935 /* Don't shrink if we are already above or below the desired limit
936 * We don't want to shrink below a single RPC, as that will negatively
937 * impact block allocation and long-term performance. */
938 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
939 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
941 if (target_bytes >= cli->cl_avail_grant) {
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
945 client_obd_list_unlock(&cli->cl_loi_list_lock);
951 osc_announce_cached(cli, &body->oa, 0);
953 client_obd_list_lock(&cli->cl_loi_list_lock);
954 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
955 cli->cl_avail_grant = target_bytes;
956 client_obd_list_unlock(&cli->cl_loi_list_lock);
957 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
958 body->oa.o_valid |= OBD_MD_FLFLAGS;
959 body->oa.o_flags = 0;
961 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
962 osc_update_next_shrink(cli);
964 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
965 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
966 sizeof(*body), body, NULL);
968 __osc_update_grant(cli, body->oa.o_grant);
973 static int osc_should_shrink_grant(struct client_obd *client)
975 cfs_time_t time = cfs_time_current();
976 cfs_time_t next_shrink = client->cl_next_shrink_grant;
978 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
979 OBD_CONNECT_GRANT_SHRINK) == 0)
982 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
983 /* Get the current RPC size directly, instead of going via:
984 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
985 * Keep comment here so that it can be found by searching. */
986 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
988 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
989 client->cl_avail_grant > brw_size)
992 osc_update_next_shrink(client);
997 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
999 struct client_obd *client;
1001 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1002 if (osc_should_shrink_grant(client))
1003 osc_shrink_grant(client);
1008 static int osc_add_shrink_grant(struct client_obd *client)
1012 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1014 osc_grant_shrink_grant_cb, NULL,
1015 &client->cl_grant_shrink_list);
1017 CERROR("add grant client %s error %d\n",
1018 client->cl_import->imp_obd->obd_name, rc);
1021 CDEBUG(D_CACHE, "add grant client %s \n",
1022 client->cl_import->imp_obd->obd_name);
1023 osc_update_next_shrink(client);
1027 static int osc_del_shrink_grant(struct client_obd *client)
1029 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1033 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1036 * ocd_grant is the total grant amount we're expect to hold: if we've
1037 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1038 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1041 * race is tolerable here: if we're evicted, but imp_state already
1042 * left EVICTED state, then cl_dirty_pages must be 0 already.
1044 client_obd_list_lock(&cli->cl_loi_list_lock);
1045 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1046 cli->cl_avail_grant = ocd->ocd_grant;
1048 cli->cl_avail_grant = ocd->ocd_grant -
1049 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1051 if (cli->cl_avail_grant < 0) {
1052 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1053 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1054 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1055 /* workaround for servers which do not have the patch from
1057 cli->cl_avail_grant = ocd->ocd_grant;
1060 /* determine the appropriate chunk size used by osc_extent. */
1061 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1062 client_obd_list_unlock(&cli->cl_loi_list_lock);
1064 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1065 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1066 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1068 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1069 list_empty(&cli->cl_grant_shrink_list))
1070 osc_add_shrink_grant(cli);
1073 /* We assume that the reason this OSC got a short read is because it read
1074 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1075 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1076 * this stripe never got written at or beyond this stripe offset yet. */
1077 static void handle_short_read(int nob_read, obd_count page_count,
1078 struct brw_page **pga)
1083 /* skip bytes read OK */
1084 while (nob_read > 0) {
1085 LASSERT (page_count > 0);
1087 if (pga[i]->count > nob_read) {
1088 /* EOF inside this page */
1089 ptr = kmap(pga[i]->pg) +
1090 (pga[i]->off & ~CFS_PAGE_MASK);
1091 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1098 nob_read -= pga[i]->count;
1103 /* zero remaining pages */
1104 while (page_count-- > 0) {
1105 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1106 memset(ptr, 0, pga[i]->count);
1112 static int check_write_rcs(struct ptlrpc_request *req,
1113 int requested_nob, int niocount,
1114 obd_count page_count, struct brw_page **pga)
1119 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1120 sizeof(*remote_rcs) *
1122 if (remote_rcs == NULL) {
1123 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1127 /* return error if any niobuf was in error */
1128 for (i = 0; i < niocount; i++) {
1129 if ((int)remote_rcs[i] < 0)
1130 return(remote_rcs[i]);
1132 if (remote_rcs[i] != 0) {
1133 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134 i, remote_rcs[i], req);
1139 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1140 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141 req->rq_bulk->bd_nob_transferred, requested_nob);
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1150 if (p1->flag != p2->flag) {
1151 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1152 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1153 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1155 /* warn if we try to combine flags that we don't know to be
1156 * safe to combine */
1157 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159 "report this at https://jira.hpdd.intel.com/\n",
1160 p1->flag, p2->flag);
1165 return (p1->off + p1->count == p2->off);
1168 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1169 struct brw_page **pga, int opc,
1170 cksum_type_t cksum_type)
1174 struct cfs_crypto_hash_desc *hdesc;
1175 unsigned int bufsize;
1177 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1179 LASSERT(pg_count > 0);
1181 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1182 if (IS_ERR(hdesc)) {
1183 CERROR("Unable to initialize checksum hash %s\n",
1184 cfs_crypto_hash_name(cfs_alg));
1185 return PTR_ERR(hdesc);
1188 while (nob > 0 && pg_count > 0) {
1189 int count = pga[i]->count > nob ? nob : pga[i]->count;
1191 /* corrupt the data before we compute the checksum, to
1192 * simulate an OST->client data error */
1193 if (i == 0 && opc == OST_READ &&
1194 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1195 unsigned char *ptr = kmap(pga[i]->pg);
1196 int off = pga[i]->off & ~CFS_PAGE_MASK;
1198 memcpy(ptr + off, "bad1", min(4, nob));
1201 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1202 pga[i]->off & ~CFS_PAGE_MASK,
1204 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1205 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1207 nob -= pga[i]->count;
1212 bufsize = sizeof(cksum);
1213 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1215 /* For sending we only compute the wrong checksum instead
1216 * of corrupting the data so it is still correct on a redo */
1217 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1223 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1224 struct lov_stripe_md *lsm, obd_count page_count,
1225 struct brw_page **pga,
1226 struct ptlrpc_request **reqp,
1227 struct obd_capa *ocapa, int reserve,
1230 struct ptlrpc_request *req;
1231 struct ptlrpc_bulk_desc *desc;
1232 struct ost_body *body;
1233 struct obd_ioobj *ioobj;
1234 struct niobuf_remote *niobuf;
1235 int niocount, i, requested_nob, opc, rc;
1236 struct osc_brw_async_args *aa;
1237 struct req_capsule *pill;
1238 struct brw_page *pg_prev;
1241 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1242 RETURN(-ENOMEM); /* Recoverable */
1243 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1244 RETURN(-EINVAL); /* Fatal */
1246 if ((cmd & OBD_BRW_WRITE) != 0) {
1248 req = ptlrpc_request_alloc_pool(cli->cl_import,
1249 cli->cl_import->imp_rq_pool,
1250 &RQF_OST_BRW_WRITE);
1253 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1258 for (niocount = i = 1; i < page_count; i++) {
1259 if (!can_merge_pages(pga[i - 1], pga[i]))
1263 pill = &req->rq_pill;
1264 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1266 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1267 niocount * sizeof(*niobuf));
1268 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1270 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1272 ptlrpc_request_free(req);
1275 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1276 ptlrpc_at_set_req_timeout(req);
1277 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1279 req->rq_no_retry_einprogress = 1;
1281 desc = ptlrpc_prep_bulk_imp(req, page_count,
1282 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1283 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1287 GOTO(out, rc = -ENOMEM);
1288 /* NB request now owns desc and will free it when it gets freed */
1290 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1291 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1292 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1293 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1295 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1297 obdo_to_ioobj(oa, ioobj);
1298 ioobj->ioo_bufcnt = niocount;
1299 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1300 * that might be send for this request. The actual number is decided
1301 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1302 * "max - 1" for old client compatibility sending "0", and also so the
1303 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1304 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1305 osc_pack_capa(req, body, ocapa);
1306 LASSERT(page_count > 0);
1308 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309 struct brw_page *pg = pga[i];
1310 int poff = pg->off & ~CFS_PAGE_MASK;
1312 LASSERT(pg->count > 0);
1313 /* make sure there is no gap in the middle of page array */
1314 LASSERTF(page_count == 1 ||
1315 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1316 ergo(i > 0 && i < page_count - 1,
1317 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1318 ergo(i == page_count - 1, poff == 0)),
1319 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1320 i, page_count, pg, pg->off, pg->count);
1321 LASSERTF(i == 0 || pg->off > pg_prev->off,
1322 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1323 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1325 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1326 pg_prev->pg, page_private(pg_prev->pg),
1327 pg_prev->pg->index, pg_prev->off);
1328 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1329 (pg->flag & OBD_BRW_SRVLOCK));
1331 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1332 requested_nob += pg->count;
1334 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1336 niobuf->rnb_len += pg->count;
1338 niobuf->rnb_offset = pg->off;
1339 niobuf->rnb_len = pg->count;
1340 niobuf->rnb_flags = pg->flag;
1345 LASSERTF((void *)(niobuf - niocount) ==
1346 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1350 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1352 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1353 body->oa.o_valid |= OBD_MD_FLFLAGS;
1354 body->oa.o_flags = 0;
1356 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1359 if (osc_should_shrink_grant(cli))
1360 osc_shrink_grant_local(cli, &body->oa);
1362 /* size[REQ_REC_OFF] still sizeof (*body) */
1363 if (opc == OST_WRITE) {
1364 if (cli->cl_checksum &&
1365 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1366 /* store cl_cksum_type in a local variable since
1367 * it can be changed via lprocfs */
1368 cksum_type_t cksum_type = cli->cl_cksum_type;
1370 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371 oa->o_flags &= OBD_FL_LOCAL_MASK;
1372 body->oa.o_flags = 0;
1374 body->oa.o_flags |= cksum_type_pack(cksum_type);
1375 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1380 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1382 /* save this in 'oa', too, for later checking */
1383 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1384 oa->o_flags |= cksum_type_pack(cksum_type);
1386 /* clear out the checksum flag, in case this is a
1387 * resend but cl_checksum is no longer set. b=11238 */
1388 oa->o_valid &= ~OBD_MD_FLCKSUM;
1390 oa->o_cksum = body->oa.o_cksum;
1391 /* 1 RC per niobuf */
1392 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1393 sizeof(__u32) * niocount);
1395 if (cli->cl_checksum &&
1396 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1398 body->oa.o_flags = 0;
1399 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1400 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403 ptlrpc_request_set_replen(req);
1405 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1406 aa = ptlrpc_req_async_args(req);
1408 aa->aa_requested_nob = requested_nob;
1409 aa->aa_nio_count = niocount;
1410 aa->aa_page_count = page_count;
1414 INIT_LIST_HEAD(&aa->aa_oaps);
1415 if (ocapa && reserve)
1416 aa->aa_ocapa = capa_get(ocapa);
1419 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1420 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1421 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1422 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1426 ptlrpc_req_finished(req);
1430 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1431 __u32 client_cksum, __u32 server_cksum, int nob,
1432 obd_count page_count, struct brw_page **pga,
1433 cksum_type_t client_cksum_type)
1437 cksum_type_t cksum_type;
1439 if (server_cksum == client_cksum) {
1440 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1444 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1446 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1449 if (cksum_type != client_cksum_type)
1450 msg = "the server did not use the checksum type specified in "
1451 "the original request - likely a protocol problem";
1452 else if (new_cksum == server_cksum)
1453 msg = "changed on the client after we checksummed it - "
1454 "likely false positive due to mmap IO (bug 11742)";
1455 else if (new_cksum == client_cksum)
1456 msg = "changed in transit before arrival at OST";
1458 msg = "changed in transit AND doesn't match the original - "
1459 "likely false positive due to mmap IO (bug 11742)";
1461 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1462 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1463 msg, libcfs_nid2str(peer->nid),
1464 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1465 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1466 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1467 POSTID(&oa->o_oi), pga[0]->off,
1468 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1469 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1470 "client csum now %x\n", client_cksum, client_cksum_type,
1471 server_cksum, cksum_type, new_cksum);
1475 /* Note rc enters this function as number of bytes transferred */
1476 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1478 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1479 const lnet_process_id_t *peer =
1480 &req->rq_import->imp_connection->c_peer;
1481 struct client_obd *cli = aa->aa_cli;
1482 struct ost_body *body;
1483 __u32 client_cksum = 0;
1486 if (rc < 0 && rc != -EDQUOT) {
1487 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1491 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1492 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1494 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1498 /* set/clear over quota flag for a uid/gid */
1499 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1500 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1501 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1503 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1504 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1506 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1509 osc_update_grant(cli, body);
1514 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1515 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1517 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1519 CERROR("Unexpected +ve rc %d\n", rc);
1522 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1524 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1527 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1528 check_write_checksum(&body->oa, peer, client_cksum,
1529 body->oa.o_cksum, aa->aa_requested_nob,
1530 aa->aa_page_count, aa->aa_ppga,
1531 cksum_type_unpack(aa->aa_oa->o_flags)))
1534 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1535 aa->aa_page_count, aa->aa_ppga);
1539 /* The rest of this function executes only for OST_READs */
1541 /* if unwrap_bulk failed, return -EAGAIN to retry */
1542 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1544 GOTO(out, rc = -EAGAIN);
1546 if (rc > aa->aa_requested_nob) {
1547 CERROR("Unexpected rc %d (%d requested)\n", rc,
1548 aa->aa_requested_nob);
1552 if (rc != req->rq_bulk->bd_nob_transferred) {
1553 CERROR ("Unexpected rc %d (%d transferred)\n",
1554 rc, req->rq_bulk->bd_nob_transferred);
1558 if (rc < aa->aa_requested_nob)
1559 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1561 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1562 static int cksum_counter;
1563 __u32 server_cksum = body->oa.o_cksum;
1566 cksum_type_t cksum_type;
1568 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1569 body->oa.o_flags : 0);
1570 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1571 aa->aa_ppga, OST_READ,
1574 if (peer->nid == req->rq_bulk->bd_sender) {
1578 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1581 if (server_cksum != client_cksum) {
1582 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1583 "%s%s%s inode "DFID" object "DOSTID
1584 " extent ["LPU64"-"LPU64"]\n",
1585 req->rq_import->imp_obd->obd_name,
1586 libcfs_nid2str(peer->nid),
1588 body->oa.o_valid & OBD_MD_FLFID ?
1589 body->oa.o_parent_seq : (__u64)0,
1590 body->oa.o_valid & OBD_MD_FLFID ?
1591 body->oa.o_parent_oid : 0,
1592 body->oa.o_valid & OBD_MD_FLFID ?
1593 body->oa.o_parent_ver : 0,
1594 POSTID(&body->oa.o_oi),
1595 aa->aa_ppga[0]->off,
1596 aa->aa_ppga[aa->aa_page_count-1]->off +
1597 aa->aa_ppga[aa->aa_page_count-1]->count -
1599 CERROR("client %x, server %x, cksum_type %x\n",
1600 client_cksum, server_cksum, cksum_type);
1602 aa->aa_oa->o_cksum = client_cksum;
1606 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1609 } else if (unlikely(client_cksum)) {
1610 static int cksum_missed;
1613 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1614 CERROR("Checksum %u requested from %s but not sent\n",
1615 cksum_missed, libcfs_nid2str(peer->nid));
1621 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1622 aa->aa_oa, &body->oa);
1627 static int osc_brw_redo_request(struct ptlrpc_request *request,
1628 struct osc_brw_async_args *aa, int rc)
1630 struct ptlrpc_request *new_req;
1631 struct osc_brw_async_args *new_aa;
1632 struct osc_async_page *oap;
1635 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1636 "redo for recoverable error %d", rc);
1638 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1639 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1640 aa->aa_cli, aa->aa_oa,
1641 NULL /* lsm unused by osc currently */,
1642 aa->aa_page_count, aa->aa_ppga,
1643 &new_req, aa->aa_ocapa, 0, 1);
1647 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1648 if (oap->oap_request != NULL) {
1649 LASSERTF(request == oap->oap_request,
1650 "request %p != oap_request %p\n",
1651 request, oap->oap_request);
1652 if (oap->oap_interrupted) {
1653 ptlrpc_req_finished(new_req);
1658 /* New request takes over pga and oaps from old request.
1659 * Note that copying a list_head doesn't work, need to move it... */
1661 new_req->rq_interpret_reply = request->rq_interpret_reply;
1662 new_req->rq_async_args = request->rq_async_args;
1663 new_req->rq_commit_cb = request->rq_commit_cb;
1664 /* cap resend delay to the current request timeout, this is similar to
1665 * what ptlrpc does (see after_reply()) */
1666 if (aa->aa_resends > new_req->rq_timeout)
1667 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1669 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1670 new_req->rq_generation_set = 1;
1671 new_req->rq_import_generation = request->rq_import_generation;
1673 new_aa = ptlrpc_req_async_args(new_req);
1675 INIT_LIST_HEAD(&new_aa->aa_oaps);
1676 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1677 INIT_LIST_HEAD(&new_aa->aa_exts);
1678 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1679 new_aa->aa_resends = aa->aa_resends;
1681 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1682 if (oap->oap_request) {
1683 ptlrpc_req_finished(oap->oap_request);
1684 oap->oap_request = ptlrpc_request_addref(new_req);
1688 new_aa->aa_ocapa = aa->aa_ocapa;
1689 aa->aa_ocapa = NULL;
1691 /* XXX: This code will run into problem if we're going to support
1692 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1693 * and wait for all of them to be finished. We should inherit request
1694 * set from old request. */
1695 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1697 DEBUG_REQ(D_INFO, new_req, "new request");
1702 * ugh, we want disk allocation on the target to happen in offset order. we'll
1703 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1704 * fine for our small page arrays and doesn't require allocation. its an
1705 * insertion sort that swaps elements that are strides apart, shrinking the
1706 * stride down until its '1' and the array is sorted.
1708 static void sort_brw_pages(struct brw_page **array, int num)
1711 struct brw_page *tmp;
1715 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1720 for (i = stride ; i < num ; i++) {
1723 while (j >= stride && array[j - stride]->off > tmp->off) {
1724 array[j] = array[j - stride];
1729 } while (stride > 1);
1732 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1734 LASSERT(ppga != NULL);
1735 OBD_FREE(ppga, sizeof(*ppga) * count);
1738 static int brw_interpret(const struct lu_env *env,
1739 struct ptlrpc_request *req, void *data, int rc)
1741 struct osc_brw_async_args *aa = data;
1742 struct osc_extent *ext;
1743 struct osc_extent *tmp;
1744 struct client_obd *cli = aa->aa_cli;
1747 rc = osc_brw_fini_request(req, rc);
1748 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1749 /* When server return -EINPROGRESS, client should always retry
1750 * regardless of the number of times the bulk was resent already. */
1751 if (osc_recoverable_error(rc)) {
1752 if (req->rq_import_generation !=
1753 req->rq_import->imp_generation) {
1754 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1755 ""DOSTID", rc = %d.\n",
1756 req->rq_import->imp_obd->obd_name,
1757 POSTID(&aa->aa_oa->o_oi), rc);
1758 } else if (rc == -EINPROGRESS ||
1759 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1760 rc = osc_brw_redo_request(req, aa, rc);
1762 CERROR("%s: too many resent retries for object: "
1763 ""LPU64":"LPU64", rc = %d.\n",
1764 req->rq_import->imp_obd->obd_name,
1765 POSTID(&aa->aa_oa->o_oi), rc);
1770 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1775 capa_put(aa->aa_ocapa);
1776 aa->aa_ocapa = NULL;
1780 struct obdo *oa = aa->aa_oa;
1781 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1782 unsigned long valid = 0;
1783 struct cl_object *obj;
1784 struct osc_async_page *last;
1786 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1787 obj = osc2cl(last->oap_obj);
1789 cl_object_attr_lock(obj);
1790 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1791 attr->cat_blocks = oa->o_blocks;
1792 valid |= CAT_BLOCKS;
1794 if (oa->o_valid & OBD_MD_FLMTIME) {
1795 attr->cat_mtime = oa->o_mtime;
1798 if (oa->o_valid & OBD_MD_FLATIME) {
1799 attr->cat_atime = oa->o_atime;
1802 if (oa->o_valid & OBD_MD_FLCTIME) {
1803 attr->cat_ctime = oa->o_ctime;
1807 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1808 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1809 loff_t last_off = last->oap_count + last->oap_obj_off +
1812 /* Change file size if this is an out of quota or
1813 * direct IO write and it extends the file size */
1814 if (loi->loi_lvb.lvb_size < last_off) {
1815 attr->cat_size = last_off;
1818 /* Extend KMS if it's not a lockless write */
1819 if (loi->loi_kms < last_off &&
1820 oap2osc_page(last)->ops_srvlock == 0) {
1821 attr->cat_kms = last_off;
1827 cl_object_attr_set(env, obj, attr, valid);
1828 cl_object_attr_unlock(obj);
1830 OBDO_FREE(aa->aa_oa);
1832 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1833 osc_inc_unstable_pages(req);
1835 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1836 list_del_init(&ext->oe_link);
1837 osc_extent_finish(env, ext, 1, rc);
1839 LASSERT(list_empty(&aa->aa_exts));
1840 LASSERT(list_empty(&aa->aa_oaps));
1842 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1843 req->rq_bulk->bd_nob_transferred);
1844 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1845 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1847 client_obd_list_lock(&cli->cl_loi_list_lock);
1848 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1849 * is called so we know whether to go to sync BRWs or wait for more
1850 * RPCs to complete */
1851 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1852 cli->cl_w_in_flight--;
1854 cli->cl_r_in_flight--;
1855 osc_wake_cache_waiters(cli);
1856 client_obd_list_unlock(&cli->cl_loi_list_lock);
1858 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1862 static void brw_commit(struct ptlrpc_request *req)
1864 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1865 * this called via the rq_commit_cb, I need to ensure
1866 * osc_dec_unstable_pages is still called. Otherwise unstable
1867 * pages may be leaked. */
1868 spin_lock(&req->rq_lock);
1869 if (likely(req->rq_unstable)) {
1870 req->rq_unstable = 0;
1871 spin_unlock(&req->rq_lock);
1873 osc_dec_unstable_pages(req);
1875 req->rq_committed = 1;
1876 spin_unlock(&req->rq_lock);
1881 * Build an RPC by the list of extent @ext_list. The caller must ensure
1882 * that the total pages in this list are NOT over max pages per RPC.
1883 * Extents in the list must be in OES_RPC state.
1885 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1886 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1888 struct ptlrpc_request *req = NULL;
1889 struct osc_extent *ext;
1890 struct brw_page **pga = NULL;
1891 struct osc_brw_async_args *aa = NULL;
1892 struct obdo *oa = NULL;
1893 struct osc_async_page *oap;
1894 struct osc_async_page *tmp;
1895 struct cl_req *clerq = NULL;
1896 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1898 struct cl_req_attr *crattr = NULL;
1899 obd_off starting_offset = OBD_OBJECT_EOF;
1900 obd_off ending_offset = 0;
1904 bool soft_sync = false;
1907 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1910 LASSERT(!list_empty(ext_list));
1912 /* add pages into rpc_list to build BRW rpc */
1913 list_for_each_entry(ext, ext_list, oe_link) {
1914 LASSERT(ext->oe_state == OES_RPC);
1915 mem_tight |= ext->oe_memalloc;
1916 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1918 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1919 if (starting_offset > oap->oap_obj_off)
1920 starting_offset = oap->oap_obj_off;
1922 LASSERT(oap->oap_page_off == 0);
1923 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1924 ending_offset = oap->oap_obj_off +
1927 LASSERT(oap->oap_page_off + oap->oap_count ==
1932 soft_sync = osc_over_unstable_soft_limit(cli);
1934 mpflag = cfs_memory_pressure_get_and_set();
1936 OBD_ALLOC(crattr, sizeof(*crattr));
1938 GOTO(out, rc = -ENOMEM);
1940 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1942 GOTO(out, rc = -ENOMEM);
1946 GOTO(out, rc = -ENOMEM);
1949 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1950 struct cl_page *page = oap2cl_page(oap);
1951 if (clerq == NULL) {
1952 clerq = cl_req_alloc(env, page, crt,
1953 1 /* only 1-object rpcs for now */);
1955 GOTO(out, rc = PTR_ERR(clerq));
1958 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1960 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1961 pga[i] = &oap->oap_brw_page;
1962 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1963 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1964 pga[i]->pg, page_index(oap->oap_page), oap,
1967 cl_req_page_add(env, clerq, page);
1970 /* always get the data for the obdo for the rpc */
1971 LASSERT(clerq != NULL);
1972 crattr->cra_oa = oa;
1973 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1975 rc = cl_req_prep(env, clerq);
1977 CERROR("cl_req_prep failed: %d\n", rc);
1981 sort_brw_pages(pga, page_count);
1982 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1983 pga, &req, crattr->cra_capa, 1, 0);
1985 CERROR("prep_req failed: %d\n", rc);
1989 req->rq_commit_cb = brw_commit;
1990 req->rq_interpret_reply = brw_interpret;
1993 req->rq_memalloc = 1;
1995 /* Need to update the timestamps after the request is built in case
1996 * we race with setattr (locally or in queue at OST). If OST gets
1997 * later setattr before earlier BRW (as determined by the request xid),
1998 * the OST will not use BRW timestamps. Sadly, there is no obvious
1999 * way to do this in a single call. bug 10150 */
2000 cl_req_attr_set(env, clerq, crattr,
2001 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2003 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2005 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2006 aa = ptlrpc_req_async_args(req);
2007 INIT_LIST_HEAD(&aa->aa_oaps);
2008 list_splice_init(&rpc_list, &aa->aa_oaps);
2009 INIT_LIST_HEAD(&aa->aa_exts);
2010 list_splice_init(ext_list, &aa->aa_exts);
2011 aa->aa_clerq = clerq;
2013 /* queued sync pages can be torn down while the pages
2014 * were between the pending list and the rpc */
2016 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2017 /* only one oap gets a request reference */
2020 if (oap->oap_interrupted && !req->rq_intr) {
2021 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2023 ptlrpc_mark_interrupted(req);
2027 tmp->oap_request = ptlrpc_request_addref(req);
2029 client_obd_list_lock(&cli->cl_loi_list_lock);
2030 starting_offset >>= PAGE_CACHE_SHIFT;
2031 if (cmd == OBD_BRW_READ) {
2032 cli->cl_r_in_flight++;
2033 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2034 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2035 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2036 starting_offset + 1);
2038 cli->cl_w_in_flight++;
2039 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2040 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2041 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2042 starting_offset + 1);
2044 client_obd_list_unlock(&cli->cl_loi_list_lock);
2046 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2047 page_count, aa, cli->cl_r_in_flight,
2048 cli->cl_w_in_flight);
2050 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2051 * see which CPU/NUMA node the majority of pages were allocated
2052 * on, and try to assign the async RPC to the CPU core
2053 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2055 * But on the other hand, we expect that multiple ptlrpcd
2056 * threads and the initial write sponsor can run in parallel,
2057 * especially when data checksum is enabled, which is CPU-bound
2058 * operation and single ptlrpcd thread cannot process in time.
2059 * So more ptlrpcd threads sharing BRW load
2060 * (with PDL_POLICY_ROUND) seems better.
2062 ptlrpcd_add_req(req, pol, -1);
2068 cfs_memory_pressure_restore(mpflag);
2070 if (crattr != NULL) {
2071 capa_put(crattr->cra_capa);
2072 OBD_FREE(crattr, sizeof(*crattr));
2076 LASSERT(req == NULL);
2081 OBD_FREE(pga, sizeof(*pga) * page_count);
2082 /* this should happen rarely and is pretty bad, it makes the
2083 * pending list not follow the dirty order */
2084 while (!list_empty(ext_list)) {
2085 ext = list_entry(ext_list->next, struct osc_extent,
2087 list_del_init(&ext->oe_link);
2088 osc_extent_finish(env, ext, 0, rc);
2090 if (clerq && !IS_ERR(clerq))
2091 cl_req_completion(env, clerq, rc);
2096 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2097 struct ldlm_enqueue_info *einfo)
2099 void *data = einfo->ei_cbdata;
2102 LASSERT(lock != NULL);
2103 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2104 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2105 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2106 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2108 lock_res_and_lock(lock);
2110 if (lock->l_ast_data == NULL)
2111 lock->l_ast_data = data;
2112 if (lock->l_ast_data == data)
2115 unlock_res_and_lock(lock);
2120 static int osc_set_data_with_check(struct lustre_handle *lockh,
2121 struct ldlm_enqueue_info *einfo)
2123 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2127 set = osc_set_lock_data_with_check(lock, einfo);
2128 LDLM_LOCK_PUT(lock);
2130 CERROR("lockh %p, data %p - client evicted?\n",
2131 lockh, einfo->ei_cbdata);
2135 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2136 ldlm_iterator_t replace, void *data)
2138 struct ldlm_res_id res_id;
2139 struct obd_device *obd = class_exp2obd(exp);
2141 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2142 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2146 /* find any ldlm lock of the inode in osc
2150 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2151 ldlm_iterator_t replace, void *data)
2153 struct ldlm_res_id res_id;
2154 struct obd_device *obd = class_exp2obd(exp);
2157 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2158 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2159 if (rc == LDLM_ITER_STOP)
2161 if (rc == LDLM_ITER_CONTINUE)
2166 static int osc_enqueue_fini(struct ptlrpc_request *req,
2167 osc_enqueue_upcall_f upcall, void *cookie,
2168 struct lustre_handle *lockh, ldlm_mode_t mode,
2169 __u64 *flags, int agl, int errcode)
2171 bool intent = *flags & LDLM_FL_HAS_INTENT;
2175 /* The request was created before ldlm_cli_enqueue call. */
2176 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2177 struct ldlm_reply *rep;
2179 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2180 LASSERT(rep != NULL);
2182 rep->lock_policy_res1 =
2183 ptlrpc_status_ntoh(rep->lock_policy_res1);
2184 if (rep->lock_policy_res1)
2185 errcode = rep->lock_policy_res1;
2187 *flags |= LDLM_FL_LVB_READY;
2188 } else if (errcode == ELDLM_OK) {
2189 *flags |= LDLM_FL_LVB_READY;
2192 /* Call the update callback. */
2193 rc = (*upcall)(cookie, lockh, errcode);
2195 /* release the reference taken in ldlm_cli_enqueue() */
2196 if (errcode == ELDLM_LOCK_MATCHED)
2198 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2199 ldlm_lock_decref(lockh, mode);
2204 static int osc_enqueue_interpret(const struct lu_env *env,
2205 struct ptlrpc_request *req,
2206 struct osc_enqueue_args *aa, int rc)
2208 struct ldlm_lock *lock;
2209 struct lustre_handle *lockh = &aa->oa_lockh;
2210 ldlm_mode_t mode = aa->oa_mode;
2211 struct ost_lvb *lvb = aa->oa_lvb;
2212 __u32 lvb_len = sizeof(*lvb);
2217 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2219 lock = ldlm_handle2lock(lockh);
2220 LASSERTF(lock != NULL,
2221 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2222 lockh->cookie, req, aa);
2224 /* Take an additional reference so that a blocking AST that
2225 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2226 * to arrive after an upcall has been executed by
2227 * osc_enqueue_fini(). */
2228 ldlm_lock_addref(lockh, mode);
2230 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2231 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2233 /* Let CP AST to grant the lock first. */
2234 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2237 LASSERT(aa->oa_lvb == NULL);
2238 LASSERT(aa->oa_flags == NULL);
2239 aa->oa_flags = &flags;
2242 /* Complete obtaining the lock procedure. */
2243 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2244 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2246 /* Complete osc stuff. */
2247 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2248 aa->oa_flags, aa->oa_agl, rc);
2250 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2252 ldlm_lock_decref(lockh, mode);
2253 LDLM_LOCK_PUT(lock);
2257 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2259 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2260 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2261 * other synchronous requests, however keeping some locks and trying to obtain
2262 * others may take a considerable amount of time in a case of ost failure; and
2263 * when other sync requests do not get released lock from a client, the client
2264 * is excluded from the cluster -- such scenarious make the life difficult, so
2265 * release locks just after they are obtained. */
2266 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2267 __u64 *flags, ldlm_policy_data_t *policy,
2268 struct ost_lvb *lvb, int kms_valid,
2269 osc_enqueue_upcall_f upcall, void *cookie,
2270 struct ldlm_enqueue_info *einfo,
2271 struct ptlrpc_request_set *rqset, int async, int agl)
2273 struct obd_device *obd = exp->exp_obd;
2274 struct lustre_handle lockh = { 0 };
2275 struct ptlrpc_request *req = NULL;
2276 int intent = *flags & LDLM_FL_HAS_INTENT;
2277 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2282 /* Filesystem lock extents are extended to page boundaries so that
2283 * dealing with the page cache is a little smoother. */
2284 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2285 policy->l_extent.end |= ~CFS_PAGE_MASK;
2288 * kms is not valid when either object is completely fresh (so that no
2289 * locks are cached), or object was evicted. In the latter case cached
2290 * lock cannot be used, because it would prime inode state with
2291 * potentially stale LVB.
2296 /* Next, search for already existing extent locks that will cover us */
2297 /* If we're trying to read, we also search for an existing PW lock. The
2298 * VFS and page cache already protect us locally, so lots of readers/
2299 * writers can share a single PW lock.
2301 * There are problems with conversion deadlocks, so instead of
2302 * converting a read lock to a write lock, we'll just enqueue a new
2305 * At some point we should cancel the read lock instead of making them
2306 * send us a blocking callback, but there are problems with canceling
2307 * locks out from other users right now, too. */
2308 mode = einfo->ei_mode;
2309 if (einfo->ei_mode == LCK_PR)
2311 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2312 einfo->ei_type, policy, mode, &lockh, 0);
2314 struct ldlm_lock *matched;
2316 if (*flags & LDLM_FL_TEST_LOCK)
2319 matched = ldlm_handle2lock(&lockh);
2321 /* For AGL, if there already exists a matched lock,
2322 * return earlier and inform the caller. */
2323 ldlm_lock_decref(&lockh, mode);
2324 LDLM_LOCK_PUT(matched);
2326 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2327 *flags |= LDLM_FL_LVB_READY;
2329 /* We already have a lock, and it's referenced. */
2330 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2332 ldlm_lock_decref(&lockh, mode);
2333 LDLM_LOCK_PUT(matched);
2336 ldlm_lock_decref(&lockh, mode);
2337 LDLM_LOCK_PUT(matched);
2342 if (*flags & LDLM_FL_TEST_LOCK)
2346 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2347 &RQF_LDLM_ENQUEUE_LVB);
2351 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2353 ptlrpc_request_free(req);
2357 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2359 ptlrpc_request_set_replen(req);
2362 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2363 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2365 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2366 sizeof(*lvb), LVB_T_OST, &lockh, async);
2369 struct osc_enqueue_args *aa;
2370 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2371 aa = ptlrpc_req_async_args(req);
2373 aa->oa_mode = einfo->ei_mode;
2374 aa->oa_type = einfo->ei_type;
2375 lustre_handle_copy(&aa->oa_lockh, &lockh);
2376 aa->oa_upcall = upcall;
2377 aa->oa_cookie = cookie;
2380 aa->oa_flags = flags;
2383 /* AGL is essentially to enqueue an DLM lock
2384 * in advance, so we don't care about the
2385 * result of AGL enqueue. */
2387 aa->oa_flags = NULL;
2390 req->rq_interpret_reply =
2391 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2392 if (rqset == PTLRPCD_SET)
2393 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2395 ptlrpc_set_add_req(rqset, req);
2396 } else if (intent) {
2397 ptlrpc_req_finished(req);
2402 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2405 ptlrpc_req_finished(req);
2410 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2411 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2412 __u64 *flags, void *data, struct lustre_handle *lockh,
2415 struct obd_device *obd = exp->exp_obd;
2416 __u64 lflags = *flags;
2420 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2423 /* Filesystem lock extents are extended to page boundaries so that
2424 * dealing with the page cache is a little smoother */
2425 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2426 policy->l_extent.end |= ~CFS_PAGE_MASK;
2428 /* Next, search for already existing extent locks that will cover us */
2429 /* If we're trying to read, we also search for an existing PW lock. The
2430 * VFS and page cache already protect us locally, so lots of readers/
2431 * writers can share a single PW lock. */
2435 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2436 res_id, type, policy, rc, lockh, unref);
2439 if (!osc_set_data_with_check(lockh, data)) {
2440 if (!(lflags & LDLM_FL_TEST_LOCK))
2441 ldlm_lock_decref(lockh, rc);
2445 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2446 ldlm_lock_addref(lockh, LCK_PR);
2447 ldlm_lock_decref(lockh, LCK_PW);
2454 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2458 if (unlikely(mode == LCK_GROUP))
2459 ldlm_lock_decref_and_cancel(lockh, mode);
2461 ldlm_lock_decref(lockh, mode);
2466 static int osc_statfs_interpret(const struct lu_env *env,
2467 struct ptlrpc_request *req,
2468 struct osc_async_args *aa, int rc)
2470 struct obd_statfs *msfs;
2474 /* The request has in fact never been sent
2475 * due to issues at a higher level (LOV).
2476 * Exit immediately since the caller is
2477 * aware of the problem and takes care
2478 * of the clean up */
2481 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2482 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2488 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2490 GOTO(out, rc = -EPROTO);
2493 *aa->aa_oi->oi_osfs = *msfs;
2495 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2499 static int osc_statfs_async(struct obd_export *exp,
2500 struct obd_info *oinfo, __u64 max_age,
2501 struct ptlrpc_request_set *rqset)
2503 struct obd_device *obd = class_exp2obd(exp);
2504 struct ptlrpc_request *req;
2505 struct osc_async_args *aa;
2509 /* We could possibly pass max_age in the request (as an absolute
2510 * timestamp or a "seconds.usec ago") so the target can avoid doing
2511 * extra calls into the filesystem if that isn't necessary (e.g.
2512 * during mount that would help a bit). Having relative timestamps
2513 * is not so great if request processing is slow, while absolute
2514 * timestamps are not ideal because they need time synchronization. */
2515 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2519 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2521 ptlrpc_request_free(req);
2524 ptlrpc_request_set_replen(req);
2525 req->rq_request_portal = OST_CREATE_PORTAL;
2526 ptlrpc_at_set_req_timeout(req);
2528 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2529 /* procfs requests not want stat in wait for avoid deadlock */
2530 req->rq_no_resend = 1;
2531 req->rq_no_delay = 1;
2534 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2535 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2536 aa = ptlrpc_req_async_args(req);
2539 ptlrpc_set_add_req(rqset, req);
2543 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2544 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2546 struct obd_device *obd = class_exp2obd(exp);
2547 struct obd_statfs *msfs;
2548 struct ptlrpc_request *req;
2549 struct obd_import *imp = NULL;
2553 /*Since the request might also come from lprocfs, so we need
2554 *sync this with client_disconnect_export Bug15684*/
2555 down_read(&obd->u.cli.cl_sem);
2556 if (obd->u.cli.cl_import)
2557 imp = class_import_get(obd->u.cli.cl_import);
2558 up_read(&obd->u.cli.cl_sem);
2562 /* We could possibly pass max_age in the request (as an absolute
2563 * timestamp or a "seconds.usec ago") so the target can avoid doing
2564 * extra calls into the filesystem if that isn't necessary (e.g.
2565 * during mount that would help a bit). Having relative timestamps
2566 * is not so great if request processing is slow, while absolute
2567 * timestamps are not ideal because they need time synchronization. */
2568 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2570 class_import_put(imp);
2575 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2577 ptlrpc_request_free(req);
2580 ptlrpc_request_set_replen(req);
2581 req->rq_request_portal = OST_CREATE_PORTAL;
2582 ptlrpc_at_set_req_timeout(req);
2584 if (flags & OBD_STATFS_NODELAY) {
2585 /* procfs requests not want stat in wait for avoid deadlock */
2586 req->rq_no_resend = 1;
2587 req->rq_no_delay = 1;
2590 rc = ptlrpc_queue_wait(req);
2594 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2596 GOTO(out, rc = -EPROTO);
2603 ptlrpc_req_finished(req);
2607 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2608 void *karg, void *uarg)
2610 struct obd_device *obd = exp->exp_obd;
2611 struct obd_ioctl_data *data = karg;
2615 if (!try_module_get(THIS_MODULE)) {
2616 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2617 module_name(THIS_MODULE));
2621 case OBD_IOC_CLIENT_RECOVER:
2622 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2623 data->ioc_inlbuf1, 0);
2627 case IOC_OSC_SET_ACTIVE:
2628 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2631 case OBD_IOC_POLL_QUOTACHECK:
2632 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2634 case OBD_IOC_PING_TARGET:
2635 err = ptlrpc_obd_ping(obd);
2638 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2639 cmd, current_comm());
2640 GOTO(out, err = -ENOTTY);
2643 module_put(THIS_MODULE);
2647 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2648 obd_count keylen, void *key, __u32 *vallen, void *val,
2649 struct lov_stripe_md *lsm)
2652 if (!vallen || !val)
2655 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2656 __u32 *stripe = val;
2657 *vallen = sizeof(*stripe);
2660 } else if (KEY_IS(KEY_LAST_ID)) {
2661 struct ptlrpc_request *req;
2666 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2667 &RQF_OST_GET_INFO_LAST_ID);
2671 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2672 RCL_CLIENT, keylen);
2673 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2675 ptlrpc_request_free(req);
2679 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2680 memcpy(tmp, key, keylen);
2682 req->rq_no_delay = req->rq_no_resend = 1;
2683 ptlrpc_request_set_replen(req);
2684 rc = ptlrpc_queue_wait(req);
2688 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2690 GOTO(out, rc = -EPROTO);
2692 *((obd_id *)val) = *reply;
2694 ptlrpc_req_finished(req);
2696 } else if (KEY_IS(KEY_FIEMAP)) {
2697 struct ll_fiemap_info_key *fm_key =
2698 (struct ll_fiemap_info_key *)key;
2699 struct ldlm_res_id res_id;
2700 ldlm_policy_data_t policy;
2701 struct lustre_handle lockh;
2702 ldlm_mode_t mode = 0;
2703 struct ptlrpc_request *req;
2704 struct ll_user_fiemap *reply;
2708 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2711 policy.l_extent.start = fm_key->fiemap.fm_start &
2714 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2715 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2716 policy.l_extent.end = OBD_OBJECT_EOF;
2718 policy.l_extent.end = (fm_key->fiemap.fm_start +
2719 fm_key->fiemap.fm_length +
2720 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2722 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2723 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2724 LDLM_FL_BLOCK_GRANTED |
2726 &res_id, LDLM_EXTENT, &policy,
2727 LCK_PR | LCK_PW, &lockh, 0);
2728 if (mode) { /* lock is cached on client */
2729 if (mode != LCK_PR) {
2730 ldlm_lock_addref(&lockh, LCK_PR);
2731 ldlm_lock_decref(&lockh, LCK_PW);
2733 } else { /* no cached lock, needs acquire lock on server side */
2734 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2735 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2739 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2740 &RQF_OST_GET_INFO_FIEMAP);
2742 GOTO(drop_lock, rc = -ENOMEM);
2744 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2745 RCL_CLIENT, keylen);
2746 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2747 RCL_CLIENT, *vallen);
2748 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2749 RCL_SERVER, *vallen);
2751 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2753 ptlrpc_request_free(req);
2754 GOTO(drop_lock, rc);
2757 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2758 memcpy(tmp, key, keylen);
2759 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2760 memcpy(tmp, val, *vallen);
2762 ptlrpc_request_set_replen(req);
2763 rc = ptlrpc_queue_wait(req);
2767 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2769 GOTO(fini_req, rc = -EPROTO);
2771 memcpy(val, reply, *vallen);
2773 ptlrpc_req_finished(req);
2776 ldlm_lock_decref(&lockh, LCK_PR);
2783 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2784 obd_count keylen, void *key, obd_count vallen,
2785 void *val, struct ptlrpc_request_set *set)
2787 struct ptlrpc_request *req;
2788 struct obd_device *obd = exp->exp_obd;
2789 struct obd_import *imp = class_exp2cliimp(exp);
2794 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2796 if (KEY_IS(KEY_CHECKSUM)) {
2797 if (vallen != sizeof(int))
2799 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2803 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2804 sptlrpc_conf_client_adapt(obd);
2808 if (KEY_IS(KEY_FLUSH_CTX)) {
2809 sptlrpc_import_flush_my_ctx(imp);
2813 if (KEY_IS(KEY_CACHE_SET)) {
2814 struct client_obd *cli = &obd->u.cli;
2816 LASSERT(cli->cl_cache == NULL); /* only once */
2817 cli->cl_cache = (struct cl_client_cache *)val;
2818 atomic_inc(&cli->cl_cache->ccc_users);
2819 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2821 /* add this osc into entity list */
2822 LASSERT(list_empty(&cli->cl_lru_osc));
2823 spin_lock(&cli->cl_cache->ccc_lru_lock);
2824 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2825 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2830 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2831 struct client_obd *cli = &obd->u.cli;
2832 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2833 long target = *(long *)val;
2835 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2840 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2843 /* We pass all other commands directly to OST. Since nobody calls osc
2844 methods directly and everybody is supposed to go through LOV, we
2845 assume lov checked invalid values for us.
2846 The only recognised values so far are evict_by_nid and mds_conn.
2847 Even if something bad goes through, we'd get a -EINVAL from OST
2850 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2851 &RQF_OST_SET_GRANT_INFO :
2856 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2857 RCL_CLIENT, keylen);
2858 if (!KEY_IS(KEY_GRANT_SHRINK))
2859 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2860 RCL_CLIENT, vallen);
2861 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2863 ptlrpc_request_free(req);
2867 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2868 memcpy(tmp, key, keylen);
2869 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2872 memcpy(tmp, val, vallen);
2874 if (KEY_IS(KEY_GRANT_SHRINK)) {
2875 struct osc_grant_args *aa;
2878 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2879 aa = ptlrpc_req_async_args(req);
2882 ptlrpc_req_finished(req);
2885 *oa = ((struct ost_body *)val)->oa;
2887 req->rq_interpret_reply = osc_shrink_grant_interpret;
2890 ptlrpc_request_set_replen(req);
2891 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2892 LASSERT(set != NULL);
2893 ptlrpc_set_add_req(set, req);
2894 ptlrpc_check_set(NULL, set);
2896 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2901 static int osc_reconnect(const struct lu_env *env,
2902 struct obd_export *exp, struct obd_device *obd,
2903 struct obd_uuid *cluuid,
2904 struct obd_connect_data *data,
2907 struct client_obd *cli = &obd->u.cli;
2909 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2912 client_obd_list_lock(&cli->cl_loi_list_lock);
2913 data->ocd_grant = (cli->cl_avail_grant +
2914 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2915 2 * cli_brw_size(obd);
2916 lost_grant = cli->cl_lost_grant;
2917 cli->cl_lost_grant = 0;
2918 client_obd_list_unlock(&cli->cl_loi_list_lock);
2920 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2921 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2922 data->ocd_version, data->ocd_grant, lost_grant);
2928 static int osc_disconnect(struct obd_export *exp)
2930 struct obd_device *obd = class_exp2obd(exp);
2931 struct llog_ctxt *ctxt;
2934 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2936 if (obd->u.cli.cl_conn_count == 1) {
2937 /* Flush any remaining cancel messages out to the
2939 llog_sync(ctxt, exp, 0);
2941 llog_ctxt_put(ctxt);
2943 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2947 rc = client_disconnect_export(exp);
2949 * Initially we put del_shrink_grant before disconnect_export, but it
2950 * causes the following problem if setup (connect) and cleanup
2951 * (disconnect) are tangled together.
2952 * connect p1 disconnect p2
2953 * ptlrpc_connect_import
2954 * ............... class_manual_cleanup
2957 * ptlrpc_connect_interrupt
2959 * add this client to shrink list
2961 * Bang! pinger trigger the shrink.
2962 * So the osc should be disconnected from the shrink list, after we
2963 * are sure the import has been destroyed. BUG18662
2965 if (obd->u.cli.cl_import == NULL)
2966 osc_del_shrink_grant(&obd->u.cli);
2970 static int osc_import_event(struct obd_device *obd,
2971 struct obd_import *imp,
2972 enum obd_import_event event)
2974 struct client_obd *cli;
2978 LASSERT(imp->imp_obd == obd);
2981 case IMP_EVENT_DISCON: {
2983 client_obd_list_lock(&cli->cl_loi_list_lock);
2984 cli->cl_avail_grant = 0;
2985 cli->cl_lost_grant = 0;
2986 client_obd_list_unlock(&cli->cl_loi_list_lock);
2989 case IMP_EVENT_INACTIVE: {
2990 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2993 case IMP_EVENT_INVALIDATE: {
2994 struct ldlm_namespace *ns = obd->obd_namespace;
2998 env = cl_env_get(&refcheck);
3002 /* all pages go to failing rpcs due to the invalid
3004 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3006 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3007 cl_env_put(env, &refcheck);
3012 case IMP_EVENT_ACTIVE: {
3013 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3016 case IMP_EVENT_OCD: {
3017 struct obd_connect_data *ocd = &imp->imp_connect_data;
3019 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3020 osc_init_grant(&obd->u.cli, ocd);
3023 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3024 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3026 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3029 case IMP_EVENT_DEACTIVATE: {
3030 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3033 case IMP_EVENT_ACTIVATE: {
3034 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3038 CERROR("Unknown import event %d\n", event);
3045 * Determine whether the lock can be canceled before replaying the lock
3046 * during recovery, see bug16774 for detailed information.
3048 * \retval zero the lock can't be canceled
3049 * \retval other ok to cancel
3051 static int osc_cancel_weight(struct ldlm_lock *lock)
3054 * Cancel all unused and granted extent lock.
3056 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3057 lock->l_granted_mode == lock->l_req_mode &&
3058 osc_ldlm_weigh_ast(lock) == 0)
3064 static int brw_queue_work(const struct lu_env *env, void *data)
3066 struct client_obd *cli = data;
3068 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3070 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3074 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3076 struct client_obd *cli = &obd->u.cli;
3077 struct obd_type *type;
3082 rc = ptlrpcd_addref();
3086 rc = client_obd_setup(obd, lcfg);
3088 GOTO(out_ptlrpcd, rc);
3090 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3091 if (IS_ERR(handler))
3092 GOTO(out_client_setup, rc = PTR_ERR(handler));
3093 cli->cl_writeback_work = handler;
3095 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3096 if (IS_ERR(handler))
3097 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3098 cli->cl_lru_work = handler;
3100 rc = osc_quota_setup(obd);
3102 GOTO(out_ptlrpcd_work, rc);
3104 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3107 obd->obd_vars = lprocfs_osc_obd_vars;
3109 /* If this is true then both client (osc) and server (osp) are on the
3110 * same node. The osp layer if loaded first will register the osc proc
3111 * directory. In that case this obd_device will be attached its proc
3112 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3113 type = class_search_type(LUSTRE_OSP_NAME);
3114 if (type && type->typ_procsym) {
3115 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3117 obd->obd_vars, obd);
3118 if (IS_ERR(obd->obd_proc_entry)) {
3119 rc = PTR_ERR(obd->obd_proc_entry);
3120 CERROR("error %d setting up lprocfs for %s\n", rc,
3122 obd->obd_proc_entry = NULL;
3125 rc = lprocfs_obd_setup(obd);
3128 /* If the basic OSC proc tree construction succeeded then
3129 * lets do the rest. */
3131 lproc_osc_attach_seqstat(obd);
3132 sptlrpc_lprocfs_cliobd_attach(obd);
3133 ptlrpc_lprocfs_register_obd(obd);
3136 /* We need to allocate a few requests more, because
3137 * brw_interpret tries to create new requests before freeing
3138 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3139 * reserved, but I'm afraid that might be too much wasted RAM
3140 * in fact, so 2 is just my guess and still should work. */
3141 cli->cl_import->imp_rq_pool =
3142 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3144 ptlrpc_add_rqs_to_pool);
3146 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3147 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3151 if (cli->cl_writeback_work != NULL) {
3152 ptlrpcd_destroy_work(cli->cl_writeback_work);
3153 cli->cl_writeback_work = NULL;
3155 if (cli->cl_lru_work != NULL) {
3156 ptlrpcd_destroy_work(cli->cl_lru_work);
3157 cli->cl_lru_work = NULL;
3160 client_obd_cleanup(obd);
3166 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3172 case OBD_CLEANUP_EARLY: {
3173 struct obd_import *imp;
3174 imp = obd->u.cli.cl_import;
3175 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3176 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3177 ptlrpc_deactivate_import(imp);
3178 spin_lock(&imp->imp_lock);
3179 imp->imp_pingable = 0;
3180 spin_unlock(&imp->imp_lock);
3183 case OBD_CLEANUP_EXPORTS: {
3184 struct client_obd *cli = &obd->u.cli;
3186 * for echo client, export may be on zombie list, wait for
3187 * zombie thread to cull it, because cli.cl_import will be
3188 * cleared in client_disconnect_export():
3189 * class_export_destroy() -> obd_cleanup() ->
3190 * echo_device_free() -> echo_client_cleanup() ->
3191 * obd_disconnect() -> osc_disconnect() ->
3192 * client_disconnect_export()
3194 obd_zombie_barrier();
3195 if (cli->cl_writeback_work) {
3196 ptlrpcd_destroy_work(cli->cl_writeback_work);
3197 cli->cl_writeback_work = NULL;
3199 if (cli->cl_lru_work) {
3200 ptlrpcd_destroy_work(cli->cl_lru_work);
3201 cli->cl_lru_work = NULL;
3203 obd_cleanup_client_import(obd);
3204 ptlrpc_lprocfs_unregister_obd(obd);
3205 lprocfs_obd_cleanup(obd);
3206 rc = obd_llog_finish(obd, 0);
3208 CERROR("failed to cleanup llogging subsystems\n");
3215 int osc_cleanup(struct obd_device *obd)
3217 struct client_obd *cli = &obd->u.cli;
3223 if (cli->cl_cache != NULL) {
3224 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3225 spin_lock(&cli->cl_cache->ccc_lru_lock);
3226 list_del_init(&cli->cl_lru_osc);
3227 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3228 cli->cl_lru_left = NULL;
3229 atomic_dec(&cli->cl_cache->ccc_users);
3230 cli->cl_cache = NULL;
3233 /* free memory of osc quota cache */
3234 osc_quota_cleanup(obd);
3236 rc = client_obd_cleanup(obd);
3242 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3244 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3245 return rc > 0 ? 0: rc;
3248 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3250 return osc_process_config_base(obd, buf);
3253 struct obd_ops osc_obd_ops = {
3254 .o_owner = THIS_MODULE,
3255 .o_setup = osc_setup,
3256 .o_precleanup = osc_precleanup,
3257 .o_cleanup = osc_cleanup,
3258 .o_add_conn = client_import_add_conn,
3259 .o_del_conn = client_import_del_conn,
3260 .o_connect = client_connect_import,
3261 .o_reconnect = osc_reconnect,
3262 .o_disconnect = osc_disconnect,
3263 .o_statfs = osc_statfs,
3264 .o_statfs_async = osc_statfs_async,
3265 .o_unpackmd = osc_unpackmd,
3266 .o_create = osc_create,
3267 .o_destroy = osc_destroy,
3268 .o_getattr = osc_getattr,
3269 .o_getattr_async = osc_getattr_async,
3270 .o_setattr = osc_setattr,
3271 .o_setattr_async = osc_setattr_async,
3272 .o_change_cbdata = osc_change_cbdata,
3273 .o_find_cbdata = osc_find_cbdata,
3274 .o_iocontrol = osc_iocontrol,
3275 .o_get_info = osc_get_info,
3276 .o_set_info_async = osc_set_info_async,
3277 .o_import_event = osc_import_event,
3278 .o_process_config = osc_process_config,
3279 .o_quotactl = osc_quotactl,
3280 .o_quotacheck = osc_quotacheck,
3283 extern struct lu_kmem_descr osc_caches[];
3284 extern struct lock_class_key osc_ast_guard_class;
3286 int __init osc_init(void)
3288 bool enable_proc = true;
3289 struct obd_type *type;
3293 /* print an address of _any_ initialized kernel symbol from this
3294 * module, to allow debugging with gdb that doesn't support data
3295 * symbols from modules.*/
3296 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3298 rc = lu_kmem_init(osc_caches);
3302 type = class_search_type(LUSTRE_OSP_NAME);
3303 if (type != NULL && type->typ_procsym != NULL)
3304 enable_proc = false;
3306 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3307 LUSTRE_OSC_NAME, &osc_device_type);
3309 lu_kmem_fini(osc_caches);
3316 static void /*__exit*/ osc_exit(void)
3318 class_unregister_type(LUSTRE_OSC_NAME);
3319 lu_kmem_fini(osc_caches);
3322 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3323 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3324 MODULE_LICENSE("GPL");
3326 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);