4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 # include <liblustre.h>
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67 struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72 struct lov_stripe_md *lsm)
77 lmm_size = sizeof(**lmmp);
82 OBD_FREE(*lmmp, lmm_size);
88 OBD_ALLOC(*lmmp, lmm_size);
94 LASSERT(lsm->lsm_object_id);
95 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
96 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105 struct lov_mds_md *lmm, int lmm_bytes)
108 struct obd_import *imp = class_exp2cliimp(exp);
112 if (lmm_bytes < sizeof (*lmm)) {
113 CERROR("lov_mds_md too small: %d, need %d\n",
114 lmm_bytes, (int)sizeof(*lmm));
117 /* XXX LOV_MAGIC etc check? */
119 if (lmm->lmm_object_id == 0) {
120 CERROR("lov_mds_md: zero lmm_object_id\n");
125 lsm_size = lov_stripe_md_size(1);
129 if (*lsmp != NULL && lmm == NULL) {
130 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131 OBD_FREE(*lsmp, lsm_size);
137 OBD_ALLOC(*lsmp, lsm_size);
140 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141 if ((*lsmp)->lsm_oinfo[0] == NULL) {
142 OBD_FREE(*lsmp, lsm_size);
145 loi_init((*lsmp)->lsm_oinfo[0]);
149 /* XXX zero *lsmp? */
150 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
151 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
152 LASSERT((*lsmp)->lsm_object_id);
153 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
160 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166 struct ost_body *body, void *capa)
168 struct obd_capa *oc = (struct obd_capa *)capa;
169 struct lustre_capa *c;
174 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
177 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178 DEBUG_CAPA(D_SEC, c, "pack");
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182 struct obd_info *oinfo)
184 struct ost_body *body;
186 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
189 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
190 osc_pack_capa(req, body, oinfo->oi_capa);
193 static inline void osc_set_capa_size(struct ptlrpc_request *req,
194 const struct req_msg_field *field,
198 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
200 /* it is already calculated as sizeof struct obd_capa */
204 static int osc_getattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_async_args *aa, int rc)
208 struct ost_body *body;
214 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265 struct obd_info *oinfo)
267 struct ptlrpc_request *req;
268 struct ost_body *body;
272 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
276 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
279 ptlrpc_request_free(req);
283 osc_pack_req_body(req, oinfo);
285 ptlrpc_request_set_replen(req);
287 rc = ptlrpc_queue_wait(req);
291 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
293 GOTO(out, rc = -EPROTO);
295 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
298 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308 struct obd_info *oinfo, struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324 ptlrpc_request_free(req);
328 osc_pack_req_body(req, oinfo);
330 ptlrpc_request_set_replen(req);
332 rc = ptlrpc_queue_wait(req);
336 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338 GOTO(out, rc = -EPROTO);
340 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344 ptlrpc_req_finished(req);
348 static int osc_setattr_interpret(const struct lu_env *env,
349 struct ptlrpc_request *req,
350 struct osc_setattr_args *sa, int rc)
352 struct ost_body *body;
358 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360 GOTO(out, rc = -EPROTO);
362 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
364 rc = sa->sa_upcall(sa->sa_cookie, rc);
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 obd_enqueue_update_f upcall, void *cookie,
371 struct ptlrpc_request_set *rqset)
373 struct ptlrpc_request *req;
374 struct osc_setattr_args *sa;
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385 ptlrpc_request_free(req);
389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392 osc_pack_req_body(req, oinfo);
394 ptlrpc_request_set_replen(req);
396 /* do mds to ost setattr asynchronously */
398 /* Do not wait for response. */
399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401 req->rq_interpret_reply =
402 (ptlrpc_interpterer_t)osc_setattr_interpret;
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413 ptlrpc_set_add_req(rqset, req);
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420 struct obd_trans_info *oti,
421 struct ptlrpc_request_set *rqset)
423 return osc_setattr_async_base(exp, oinfo, oti,
424 oinfo->oi_cb_up, oinfo, rqset);
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428 struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 struct ptlrpc_request *req;
431 struct ost_body *body;
432 struct lov_stripe_md *lsm;
441 rc = obd_alloc_memmd(exp, &lsm);
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448 GOTO(out, rc = -ENOMEM);
450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452 ptlrpc_request_free(req);
456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
458 lustre_set_wire_obdo(&body->oa, oa);
460 ptlrpc_request_set_replen(req);
462 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463 oa->o_flags == OBD_FL_DELORPHAN) {
465 "delorphan from OST integration");
466 /* Don't resend the delorphan req */
467 req->rq_no_resend = req->rq_no_delay = 1;
470 rc = ptlrpc_queue_wait(req);
474 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476 GOTO(out_req, rc = -EPROTO);
478 lustre_get_wire_obdo(oa, &body->oa);
480 oa->o_blksize = cli_brw_size(exp->exp_obd);
481 oa->o_valid |= OBD_MD_FLBLKSZ;
483 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484 * have valid lsm_oinfo data structs, so don't go touching that.
485 * This needs to be fixed in a big way.
487 lsm->lsm_object_id = oa->o_id;
488 lsm->lsm_object_seq = oa->o_seq;
492 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495 if (!oti->oti_logcookies)
496 oti_alloc_cookies(oti, 1);
497 *oti->oti_logcookies = oa->o_lcookie;
501 CDEBUG(D_HA, "transno: "LPD64"\n",
502 lustre_msg_get_transno(req->rq_repmsg));
504 ptlrpc_req_finished(req);
507 obd_free_memmd(exp, &lsm);
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512 obd_enqueue_update_f upcall, void *cookie,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_setattr_args *sa;
517 struct ost_body *body;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 ptlrpc_request_free(req);
531 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532 ptlrpc_at_set_req_timeout(req);
534 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537 osc_pack_capa(req, body, oinfo->oi_capa);
539 ptlrpc_request_set_replen(req);
541 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
542 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
543 sa = ptlrpc_req_async_args(req);
544 sa->sa_oa = oinfo->oi_oa;
545 sa->sa_upcall = upcall;
546 sa->sa_cookie = cookie;
547 if (rqset == PTLRPCD_SET)
548 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550 ptlrpc_set_add_req(rqset, req);
555 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
556 struct obd_info *oinfo, struct obd_trans_info *oti,
557 struct ptlrpc_request_set *rqset)
559 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
560 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
561 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
562 return osc_punch_base(exp, oinfo,
563 oinfo->oi_cb_up, oinfo, rqset);
566 static int osc_sync_interpret(const struct lu_env *env,
567 struct ptlrpc_request *req,
570 struct osc_fsync_args *fa = arg;
571 struct ost_body *body;
577 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579 CERROR ("can't unpack ost_body\n");
580 GOTO(out, rc = -EPROTO);
583 *fa->fa_oi->oi_oa = body->oa;
585 rc = fa->fa_upcall(fa->fa_cookie, rc);
589 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
590 obd_enqueue_update_f upcall, void *cookie,
591 struct ptlrpc_request_set *rqset)
593 struct ptlrpc_request *req;
594 struct ost_body *body;
595 struct osc_fsync_args *fa;
599 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
603 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
604 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606 ptlrpc_request_free(req);
610 /* overload the size and blocks fields in the oa with start/end */
611 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
614 osc_pack_capa(req, body, oinfo->oi_capa);
616 ptlrpc_request_set_replen(req);
617 req->rq_interpret_reply = osc_sync_interpret;
619 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
620 fa = ptlrpc_req_async_args(req);
622 fa->fa_upcall = upcall;
623 fa->fa_cookie = cookie;
625 if (rqset == PTLRPCD_SET)
626 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628 ptlrpc_set_add_req(rqset, req);
633 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
634 struct obd_info *oinfo, obd_size start, obd_size end,
635 struct ptlrpc_request_set *set)
640 CDEBUG(D_INFO, "oa NULL\n");
644 oinfo->oi_oa->o_size = start;
645 oinfo->oi_oa->o_blocks = end;
646 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
651 /* Find and cancel locally locks matched by @mode in the resource found by
652 * @objid. Found locks are added into @cancel list. Returns the amount of
653 * locks added to @cancels list. */
654 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656 ldlm_mode_t mode, int lock_flags)
658 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
659 struct ldlm_res_id res_id;
660 struct ldlm_resource *res;
664 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
665 * export) but disabled through procfs (flag in NS).
667 * This distinguishes from a case when ELC is not supported originally,
668 * when we still want to cancel locks in advance and just cancel them
669 * locally, without sending any RPC. */
670 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
673 ostid_build_res_name(&oa->o_oi, &res_id);
674 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
678 LDLM_RESOURCE_ADDREF(res);
679 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
680 lock_flags, 0, NULL);
681 LDLM_RESOURCE_DELREF(res);
682 ldlm_resource_putref(res);
686 static int osc_destroy_interpret(const struct lu_env *env,
687 struct ptlrpc_request *req, void *data,
690 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
692 cfs_atomic_dec(&cli->cl_destroy_in_flight);
693 cfs_waitq_signal(&cli->cl_destroy_waitq);
697 static int osc_can_send_destroy(struct client_obd *cli)
699 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
700 cli->cl_max_rpcs_in_flight) {
701 /* The destroy request can be sent */
704 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
705 cli->cl_max_rpcs_in_flight) {
707 * The counter has been modified between the two atomic
710 cfs_waitq_signal(&cli->cl_destroy_waitq);
715 int osc_create(const struct lu_env *env, struct obd_export *exp,
716 struct obdo *oa, struct lov_stripe_md **ea,
717 struct obd_trans_info *oti)
724 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
726 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
727 oa->o_flags == OBD_FL_RECREATE_OBJS) {
728 RETURN(osc_real_create(exp, oa, ea, oti));
731 if (!fid_seq_is_mdt(oa->o_seq))
732 RETURN(osc_real_create(exp, oa, ea, oti));
734 /* we should not get here anymore */
740 /* Destroy requests can be async always on the client, and we don't even really
741 * care about the return code since the client cannot do anything at all about
743 * When the MDS is unlinking a filename, it saves the file objects into a
744 * recovery llog, and these object records are cancelled when the OST reports
745 * they were destroyed and sync'd to disk (i.e. transaction committed).
746 * If the client dies, or the OST is down when the object should be destroyed,
747 * the records are not cancelled, and when the OST reconnects to the MDS next,
748 * it will retrieve the llog unlink logs and then sends the log cancellation
749 * cookies to the MDS after committing destroy transactions. */
750 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
751 struct obdo *oa, struct lov_stripe_md *ea,
752 struct obd_trans_info *oti, struct obd_export *md_export,
755 struct client_obd *cli = &exp->exp_obd->u.cli;
756 struct ptlrpc_request *req;
757 struct ost_body *body;
758 CFS_LIST_HEAD(cancels);
763 CDEBUG(D_INFO, "oa NULL\n");
767 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
768 LDLM_FL_DISCARD_DATA);
770 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
772 ldlm_lock_list_put(&cancels, l_bl_ast, count);
776 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
777 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
780 ptlrpc_request_free(req);
784 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
785 ptlrpc_at_set_req_timeout(req);
787 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
788 oa->o_lcookie = *oti->oti_logcookies;
789 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
791 lustre_set_wire_obdo(&body->oa, oa);
793 osc_pack_capa(req, body, (struct obd_capa *)capa);
794 ptlrpc_request_set_replen(req);
796 /* If osc_destory is for destroying the unlink orphan,
797 * sent from MDT to OST, which should not be blocked here,
798 * because the process might be triggered by ptlrpcd, and
799 * it is not good to block ptlrpcd thread (b=16006)*/
800 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
801 req->rq_interpret_reply = osc_destroy_interpret;
802 if (!osc_can_send_destroy(cli)) {
803 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807 * Wait until the number of on-going destroy RPCs drops
808 * under max_rpc_in_flight
810 l_wait_event_exclusive(cli->cl_destroy_waitq,
811 osc_can_send_destroy(cli), &lwi);
815 /* Do not wait for response */
816 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
823 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
825 LASSERT(!(oa->o_valid & bits));
828 client_obd_list_lock(&cli->cl_loi_list_lock);
829 oa->o_dirty = cli->cl_dirty;
830 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
831 cli->cl_dirty_max)) {
832 CERROR("dirty %lu - %lu > dirty_max %lu\n",
833 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
835 } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
836 cfs_atomic_read(&obd_dirty_transit_pages) >
837 (long)(obd_max_dirty_pages + 1))) {
838 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
839 * not covered by a lock thus they may safely race and trip
840 * this CERROR() unless we add in a small fudge factor (+1). */
841 CERROR("dirty %d - %d > system dirty_max %d\n",
842 cfs_atomic_read(&obd_dirty_pages),
843 cfs_atomic_read(&obd_dirty_transit_pages),
844 obd_max_dirty_pages);
846 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
847 CERROR("dirty %lu - dirty_max %lu too big???\n",
848 cli->cl_dirty, cli->cl_dirty_max);
851 long max_in_flight = (cli->cl_max_pages_per_rpc <<
853 (cli->cl_max_rpcs_in_flight + 1);
854 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
856 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
857 oa->o_dropped = cli->cl_lost_grant;
858 cli->cl_lost_grant = 0;
859 client_obd_list_unlock(&cli->cl_loi_list_lock);
860 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
861 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865 void osc_update_next_shrink(struct client_obd *cli)
867 cli->cl_next_shrink_grant =
868 cfs_time_shift(cli->cl_grant_shrink_interval);
869 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
870 cli->cl_next_shrink_grant);
873 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
875 client_obd_list_lock(&cli->cl_loi_list_lock);
876 cli->cl_avail_grant += grant;
877 client_obd_list_unlock(&cli->cl_loi_list_lock);
880 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
882 if (body->oa.o_valid & OBD_MD_FLGRANT) {
883 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
884 __osc_update_grant(cli, body->oa.o_grant);
888 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
889 obd_count keylen, void *key, obd_count vallen,
890 void *val, struct ptlrpc_request_set *set);
892 static int osc_shrink_grant_interpret(const struct lu_env *env,
893 struct ptlrpc_request *req,
896 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
897 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
898 struct ost_body *body;
901 __osc_update_grant(cli, oa->o_grant);
905 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
907 osc_update_grant(cli, body);
913 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
915 client_obd_list_lock(&cli->cl_loi_list_lock);
916 oa->o_grant = cli->cl_avail_grant / 4;
917 cli->cl_avail_grant -= oa->o_grant;
918 client_obd_list_unlock(&cli->cl_loi_list_lock);
919 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
920 oa->o_valid |= OBD_MD_FLFLAGS;
923 oa->o_flags |= OBD_FL_SHRINK_GRANT;
924 osc_update_next_shrink(cli);
927 /* Shrink the current grant, either from some large amount to enough for a
928 * full set of in-flight RPCs, or if we have already shrunk to that limit
929 * then to enough for a single RPC. This avoids keeping more grant than
930 * needed, and avoids shrinking the grant piecemeal. */
931 static int osc_shrink_grant(struct client_obd *cli)
933 long target = (cli->cl_max_rpcs_in_flight + 1) *
934 cli->cl_max_pages_per_rpc;
936 client_obd_list_lock(&cli->cl_loi_list_lock);
937 if (cli->cl_avail_grant <= target)
938 target = cli->cl_max_pages_per_rpc;
939 client_obd_list_unlock(&cli->cl_loi_list_lock);
941 return osc_shrink_grant_to_target(cli, target);
944 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
947 struct ost_body *body;
950 client_obd_list_lock(&cli->cl_loi_list_lock);
951 /* Don't shrink if we are already above or below the desired limit
952 * We don't want to shrink below a single RPC, as that will negatively
953 * impact block allocation and long-term performance. */
954 if (target < cli->cl_max_pages_per_rpc)
955 target = cli->cl_max_pages_per_rpc;
957 if (target >= cli->cl_avail_grant) {
958 client_obd_list_unlock(&cli->cl_loi_list_lock);
961 client_obd_list_unlock(&cli->cl_loi_list_lock);
967 osc_announce_cached(cli, &body->oa, 0);
969 client_obd_list_lock(&cli->cl_loi_list_lock);
970 body->oa.o_grant = cli->cl_avail_grant - target;
971 cli->cl_avail_grant = target;
972 client_obd_list_unlock(&cli->cl_loi_list_lock);
973 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
974 body->oa.o_valid |= OBD_MD_FLFLAGS;
975 body->oa.o_flags = 0;
977 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
978 osc_update_next_shrink(cli);
980 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
981 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
982 sizeof(*body), body, NULL);
984 __osc_update_grant(cli, body->oa.o_grant);
989 static int osc_should_shrink_grant(struct client_obd *client)
991 cfs_time_t time = cfs_time_current();
992 cfs_time_t next_shrink = client->cl_next_shrink_grant;
994 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
995 OBD_CONNECT_GRANT_SHRINK) == 0)
998 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
999 /* Get the current RPC size directly, instead of going via:
1000 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1001 * Keep comment here so that it can be found by searching. */
1002 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1004 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1005 client->cl_avail_grant > brw_size)
1008 osc_update_next_shrink(client);
1013 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1015 struct client_obd *client;
1017 cfs_list_for_each_entry(client, &item->ti_obd_list,
1018 cl_grant_shrink_list) {
1019 if (osc_should_shrink_grant(client))
1020 osc_shrink_grant(client);
1025 static int osc_add_shrink_grant(struct client_obd *client)
1029 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1031 osc_grant_shrink_grant_cb, NULL,
1032 &client->cl_grant_shrink_list);
1034 CERROR("add grant client %s error %d\n",
1035 client->cl_import->imp_obd->obd_name, rc);
1038 CDEBUG(D_CACHE, "add grant client %s \n",
1039 client->cl_import->imp_obd->obd_name);
1040 osc_update_next_shrink(client);
1044 static int osc_del_shrink_grant(struct client_obd *client)
1046 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1050 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1053 * ocd_grant is the total grant amount we're expect to hold: if we've
1054 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1055 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1057 * race is tolerable here: if we're evicted, but imp_state already
1058 * left EVICTED state, then cl_dirty must be 0 already.
1060 client_obd_list_lock(&cli->cl_loi_list_lock);
1061 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1062 cli->cl_avail_grant = ocd->ocd_grant;
1064 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1066 if (cli->cl_avail_grant < 0) {
1067 CWARN("%s: available grant < 0, the OSS is probably not running"
1068 " with patch from bug20278 (%ld) \n",
1069 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1070 /* workaround for 1.6 servers which do not have
1071 * the patch from bug20278 */
1072 cli->cl_avail_grant = ocd->ocd_grant;
1075 /* determine the appropriate chunk size used by osc_extent. */
1076 cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1077 client_obd_list_unlock(&cli->cl_loi_list_lock);
1079 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1080 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1081 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1083 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1084 cfs_list_empty(&cli->cl_grant_shrink_list))
1085 osc_add_shrink_grant(cli);
1088 /* We assume that the reason this OSC got a short read is because it read
1089 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1090 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1091 * this stripe never got written at or beyond this stripe offset yet. */
1092 static void handle_short_read(int nob_read, obd_count page_count,
1093 struct brw_page **pga)
1098 /* skip bytes read OK */
1099 while (nob_read > 0) {
1100 LASSERT (page_count > 0);
1102 if (pga[i]->count > nob_read) {
1103 /* EOF inside this page */
1104 ptr = cfs_kmap(pga[i]->pg) +
1105 (pga[i]->off & ~CFS_PAGE_MASK);
1106 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1107 cfs_kunmap(pga[i]->pg);
1113 nob_read -= pga[i]->count;
1118 /* zero remaining pages */
1119 while (page_count-- > 0) {
1120 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1121 memset(ptr, 0, pga[i]->count);
1122 cfs_kunmap(pga[i]->pg);
1127 static int check_write_rcs(struct ptlrpc_request *req,
1128 int requested_nob, int niocount,
1129 obd_count page_count, struct brw_page **pga)
1134 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1135 sizeof(*remote_rcs) *
1137 if (remote_rcs == NULL) {
1138 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142 /* return error if any niobuf was in error */
1143 for (i = 0; i < niocount; i++) {
1144 if ((int)remote_rcs[i] < 0)
1145 return(remote_rcs[i]);
1147 if (remote_rcs[i] != 0) {
1148 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1149 i, remote_rcs[i], req);
1154 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1155 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1156 req->rq_bulk->bd_nob_transferred, requested_nob);
1163 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1165 if (p1->flag != p2->flag) {
1166 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1167 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1169 /* warn if we try to combine flags that we don't know to be
1170 * safe to combine */
1171 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1172 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1173 "report this at http://bugs.whamcloud.com/\n",
1174 p1->flag, p2->flag);
1179 return (p1->off + p1->count == p2->off);
1182 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1183 struct brw_page **pga, int opc,
1184 cksum_type_t cksum_type)
1188 struct cfs_crypto_hash_desc *hdesc;
1189 unsigned int bufsize;
1191 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1193 LASSERT(pg_count > 0);
1195 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1196 if (IS_ERR(hdesc)) {
1197 CERROR("Unable to initialize checksum hash %s\n",
1198 cfs_crypto_hash_name(cfs_alg));
1199 return PTR_ERR(hdesc);
1202 while (nob > 0 && pg_count > 0) {
1203 int count = pga[i]->count > nob ? nob : pga[i]->count;
1205 /* corrupt the data before we compute the checksum, to
1206 * simulate an OST->client data error */
1207 if (i == 0 && opc == OST_READ &&
1208 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1209 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1210 int off = pga[i]->off & ~CFS_PAGE_MASK;
1211 memcpy(ptr + off, "bad1", min(4, nob));
1212 cfs_kunmap(pga[i]->pg);
1214 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1215 pga[i]->off & ~CFS_PAGE_MASK,
1217 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1218 (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1220 nob -= pga[i]->count;
1226 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1229 cfs_crypto_hash_final(hdesc, NULL, NULL);
1231 /* For sending we only compute the wrong checksum instead
1232 * of corrupting the data so it is still correct on a redo */
1233 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1239 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1240 struct lov_stripe_md *lsm, obd_count page_count,
1241 struct brw_page **pga,
1242 struct ptlrpc_request **reqp,
1243 struct obd_capa *ocapa, int reserve,
1246 struct ptlrpc_request *req;
1247 struct ptlrpc_bulk_desc *desc;
1248 struct ost_body *body;
1249 struct obd_ioobj *ioobj;
1250 struct niobuf_remote *niobuf;
1251 int niocount, i, requested_nob, opc, rc;
1252 struct osc_brw_async_args *aa;
1253 struct req_capsule *pill;
1254 struct brw_page *pg_prev;
1257 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1258 RETURN(-ENOMEM); /* Recoverable */
1259 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1260 RETURN(-EINVAL); /* Fatal */
1262 if ((cmd & OBD_BRW_WRITE) != 0) {
1264 req = ptlrpc_request_alloc_pool(cli->cl_import,
1265 cli->cl_import->imp_rq_pool,
1266 &RQF_OST_BRW_WRITE);
1269 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1274 for (niocount = i = 1; i < page_count; i++) {
1275 if (!can_merge_pages(pga[i - 1], pga[i]))
1279 pill = &req->rq_pill;
1280 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1282 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1283 niocount * sizeof(*niobuf));
1284 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1286 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1288 ptlrpc_request_free(req);
1291 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1292 ptlrpc_at_set_req_timeout(req);
1293 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1295 req->rq_no_retry_einprogress = 1;
1297 desc = ptlrpc_prep_bulk_imp(req, page_count,
1298 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1299 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1303 GOTO(out, rc = -ENOMEM);
1304 /* NB request now owns desc and will free it when it gets freed */
1306 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1307 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1308 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1309 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1311 lustre_set_wire_obdo(&body->oa, oa);
1313 obdo_to_ioobj(oa, ioobj);
1314 ioobj->ioo_bufcnt = niocount;
1315 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1316 * that might be send for this request. The actual number is decided
1317 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1318 * "max - 1" for old client compatibility sending "0", and also so the
1319 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1320 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1321 osc_pack_capa(req, body, ocapa);
1322 LASSERT(page_count > 0);
1324 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1325 struct brw_page *pg = pga[i];
1326 int poff = pg->off & ~CFS_PAGE_MASK;
1328 LASSERT(pg->count > 0);
1329 /* make sure there is no gap in the middle of page array */
1330 LASSERTF(page_count == 1 ||
1331 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1332 ergo(i > 0 && i < page_count - 1,
1333 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1334 ergo(i == page_count - 1, poff == 0)),
1335 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1336 i, page_count, pg, pg->off, pg->count);
1338 LASSERTF(i == 0 || pg->off > pg_prev->off,
1339 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1340 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1342 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1343 pg_prev->pg, page_private(pg_prev->pg),
1344 pg_prev->pg->index, pg_prev->off);
1346 LASSERTF(i == 0 || pg->off > pg_prev->off,
1347 "i %d p_c %u\n", i, page_count);
1349 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1350 (pg->flag & OBD_BRW_SRVLOCK));
1352 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1353 requested_nob += pg->count;
1355 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1357 niobuf->len += pg->count;
1359 niobuf->offset = pg->off;
1360 niobuf->len = pg->count;
1361 niobuf->flags = pg->flag;
1366 LASSERTF((void *)(niobuf - niocount) ==
1367 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1368 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1369 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1371 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1373 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1374 body->oa.o_valid |= OBD_MD_FLFLAGS;
1375 body->oa.o_flags = 0;
1377 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1380 if (osc_should_shrink_grant(cli))
1381 osc_shrink_grant_local(cli, &body->oa);
1383 /* size[REQ_REC_OFF] still sizeof (*body) */
1384 if (opc == OST_WRITE) {
1385 if (cli->cl_checksum &&
1386 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387 /* store cl_cksum_type in a local variable since
1388 * it can be changed via lprocfs */
1389 cksum_type_t cksum_type = cli->cl_cksum_type;
1391 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1392 oa->o_flags &= OBD_FL_LOCAL_MASK;
1393 body->oa.o_flags = 0;
1395 body->oa.o_flags |= cksum_type_pack(cksum_type);
1396 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1397 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1401 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1403 /* save this in 'oa', too, for later checking */
1404 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1405 oa->o_flags |= cksum_type_pack(cksum_type);
1407 /* clear out the checksum flag, in case this is a
1408 * resend but cl_checksum is no longer set. b=11238 */
1409 oa->o_valid &= ~OBD_MD_FLCKSUM;
1411 oa->o_cksum = body->oa.o_cksum;
1412 /* 1 RC per niobuf */
1413 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1414 sizeof(__u32) * niocount);
1416 if (cli->cl_checksum &&
1417 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1418 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1419 body->oa.o_flags = 0;
1420 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1421 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424 ptlrpc_request_set_replen(req);
1426 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1427 aa = ptlrpc_req_async_args(req);
1429 aa->aa_requested_nob = requested_nob;
1430 aa->aa_nio_count = niocount;
1431 aa->aa_page_count = page_count;
1435 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1436 if (ocapa && reserve)
1437 aa->aa_ocapa = capa_get(ocapa);
1443 ptlrpc_req_finished(req);
1447 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1448 __u32 client_cksum, __u32 server_cksum, int nob,
1449 obd_count page_count, struct brw_page **pga,
1450 cksum_type_t client_cksum_type)
1454 cksum_type_t cksum_type;
1456 if (server_cksum == client_cksum) {
1457 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1461 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1463 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1466 if (cksum_type != client_cksum_type)
1467 msg = "the server did not use the checksum type specified in "
1468 "the original request - likely a protocol problem";
1469 else if (new_cksum == server_cksum)
1470 msg = "changed on the client after we checksummed it - "
1471 "likely false positive due to mmap IO (bug 11742)";
1472 else if (new_cksum == client_cksum)
1473 msg = "changed in transit before arrival at OST";
1475 msg = "changed in transit AND doesn't match the original - "
1476 "likely false positive due to mmap IO (bug 11742)";
1478 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1479 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1480 msg, libcfs_nid2str(peer->nid),
1481 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1482 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1483 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1485 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1487 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1488 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1489 "client csum now %x\n", client_cksum, client_cksum_type,
1490 server_cksum, cksum_type, new_cksum);
1494 /* Note rc enters this function as number of bytes transferred */
1495 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1497 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1498 const lnet_process_id_t *peer =
1499 &req->rq_import->imp_connection->c_peer;
1500 struct client_obd *cli = aa->aa_cli;
1501 struct ost_body *body;
1502 __u32 client_cksum = 0;
1505 if (rc < 0 && rc != -EDQUOT) {
1506 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1510 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1511 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1513 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1517 /* set/clear over quota flag for a uid/gid */
1518 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1519 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1520 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1522 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1523 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1525 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1528 osc_update_grant(cli, body);
1533 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1534 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1536 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1538 CERROR("Unexpected +ve rc %d\n", rc);
1541 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1543 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1546 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1547 check_write_checksum(&body->oa, peer, client_cksum,
1548 body->oa.o_cksum, aa->aa_requested_nob,
1549 aa->aa_page_count, aa->aa_ppga,
1550 cksum_type_unpack(aa->aa_oa->o_flags)))
1553 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1554 aa->aa_page_count, aa->aa_ppga);
1558 /* The rest of this function executes only for OST_READs */
1560 /* if unwrap_bulk failed, return -EAGAIN to retry */
1561 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1563 GOTO(out, rc = -EAGAIN);
1565 if (rc > aa->aa_requested_nob) {
1566 CERROR("Unexpected rc %d (%d requested)\n", rc,
1567 aa->aa_requested_nob);
1571 if (rc != req->rq_bulk->bd_nob_transferred) {
1572 CERROR ("Unexpected rc %d (%d transferred)\n",
1573 rc, req->rq_bulk->bd_nob_transferred);
1577 if (rc < aa->aa_requested_nob)
1578 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1580 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1581 static int cksum_counter;
1582 __u32 server_cksum = body->oa.o_cksum;
1585 cksum_type_t cksum_type;
1587 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1588 body->oa.o_flags : 0);
1589 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1590 aa->aa_ppga, OST_READ,
1593 if (peer->nid == req->rq_bulk->bd_sender) {
1597 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1600 if (server_cksum == ~0 && rc > 0) {
1601 CERROR("Protocol error: server %s set the 'checksum' "
1602 "bit, but didn't send a checksum. Not fatal, "
1603 "but please notify on http://bugs.whamcloud.com/\n",
1604 libcfs_nid2str(peer->nid));
1605 } else if (server_cksum != client_cksum) {
1606 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1607 "%s%s%s inode "DFID" object "
1608 LPU64"/"LPU64" extent "
1609 "["LPU64"-"LPU64"]\n",
1610 req->rq_import->imp_obd->obd_name,
1611 libcfs_nid2str(peer->nid),
1613 body->oa.o_valid & OBD_MD_FLFID ?
1614 body->oa.o_parent_seq : (__u64)0,
1615 body->oa.o_valid & OBD_MD_FLFID ?
1616 body->oa.o_parent_oid : 0,
1617 body->oa.o_valid & OBD_MD_FLFID ?
1618 body->oa.o_parent_ver : 0,
1620 body->oa.o_valid & OBD_MD_FLGROUP ?
1621 body->oa.o_seq : (__u64)0,
1622 aa->aa_ppga[0]->off,
1623 aa->aa_ppga[aa->aa_page_count-1]->off +
1624 aa->aa_ppga[aa->aa_page_count-1]->count -
1626 CERROR("client %x, server %x, cksum_type %x\n",
1627 client_cksum, server_cksum, cksum_type);
1629 aa->aa_oa->o_cksum = client_cksum;
1633 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1636 } else if (unlikely(client_cksum)) {
1637 static int cksum_missed;
1640 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1641 CERROR("Checksum %u requested from %s but not sent\n",
1642 cksum_missed, libcfs_nid2str(peer->nid));
1648 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1653 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1654 struct lov_stripe_md *lsm,
1655 obd_count page_count, struct brw_page **pga,
1656 struct obd_capa *ocapa)
1658 struct ptlrpc_request *req;
1661 int generation, resends = 0;
1662 struct l_wait_info lwi;
1666 cfs_waitq_init(&waitq);
1667 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1670 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1671 page_count, pga, &req, ocapa, 0, resends);
1676 req->rq_generation_set = 1;
1677 req->rq_import_generation = generation;
1678 req->rq_sent = cfs_time_current_sec() + resends;
1681 rc = ptlrpc_queue_wait(req);
1683 if (rc == -ETIMEDOUT && req->rq_resend) {
1684 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1685 ptlrpc_req_finished(req);
1689 rc = osc_brw_fini_request(req, rc);
1691 ptlrpc_req_finished(req);
1692 /* When server return -EINPROGRESS, client should always retry
1693 * regardless of the number of times the bulk was resent already.*/
1694 if (osc_recoverable_error(rc)) {
1696 if (rc != -EINPROGRESS &&
1697 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1698 CERROR("%s: too many resend retries for object: "
1699 ""LPU64":"LPU64", rc = %d.\n",
1700 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1704 exp->exp_obd->u.cli.cl_import->imp_generation) {
1705 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1706 ""LPU64":"LPU64", rc = %d.\n",
1707 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1711 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1713 l_wait_event(waitq, 0, &lwi);
1718 if (rc == -EAGAIN || rc == -EINPROGRESS)
1723 static int osc_brw_redo_request(struct ptlrpc_request *request,
1724 struct osc_brw_async_args *aa, int rc)
1726 struct ptlrpc_request *new_req;
1727 struct osc_brw_async_args *new_aa;
1728 struct osc_async_page *oap;
1731 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1732 "redo for recoverable error %d", rc);
1734 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1735 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1736 aa->aa_cli, aa->aa_oa,
1737 NULL /* lsm unused by osc currently */,
1738 aa->aa_page_count, aa->aa_ppga,
1739 &new_req, aa->aa_ocapa, 0, 1);
1743 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1744 if (oap->oap_request != NULL) {
1745 LASSERTF(request == oap->oap_request,
1746 "request %p != oap_request %p\n",
1747 request, oap->oap_request);
1748 if (oap->oap_interrupted) {
1749 ptlrpc_req_finished(new_req);
1754 /* New request takes over pga and oaps from old request.
1755 * Note that copying a list_head doesn't work, need to move it... */
1757 new_req->rq_interpret_reply = request->rq_interpret_reply;
1758 new_req->rq_async_args = request->rq_async_args;
1759 /* cap resend delay to the current request timeout, this is similar to
1760 * what ptlrpc does (see after_reply()) */
1761 if (aa->aa_resends > new_req->rq_timeout)
1762 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1764 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1765 new_req->rq_generation_set = 1;
1766 new_req->rq_import_generation = request->rq_import_generation;
1768 new_aa = ptlrpc_req_async_args(new_req);
1770 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1771 cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1772 CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1773 cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1774 new_aa->aa_resends = aa->aa_resends;
1776 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1777 if (oap->oap_request) {
1778 ptlrpc_req_finished(oap->oap_request);
1779 oap->oap_request = ptlrpc_request_addref(new_req);
1783 new_aa->aa_ocapa = aa->aa_ocapa;
1784 aa->aa_ocapa = NULL;
1786 /* XXX: This code will run into problem if we're going to support
1787 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1788 * and wait for all of them to be finished. We should inherit request
1789 * set from old request. */
1790 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1792 DEBUG_REQ(D_INFO, new_req, "new request");
1797 * ugh, we want disk allocation on the target to happen in offset order. we'll
1798 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1799 * fine for our small page arrays and doesn't require allocation. its an
1800 * insertion sort that swaps elements that are strides apart, shrinking the
1801 * stride down until its '1' and the array is sorted.
1803 static void sort_brw_pages(struct brw_page **array, int num)
1806 struct brw_page *tmp;
1810 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1815 for (i = stride ; i < num ; i++) {
1818 while (j >= stride && array[j - stride]->off > tmp->off) {
1819 array[j] = array[j - stride];
1824 } while (stride > 1);
1827 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1833 LASSERT (pages > 0);
1834 offset = pg[i]->off & ~CFS_PAGE_MASK;
1838 if (pages == 0) /* that's all */
1841 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1842 return count; /* doesn't end on page boundary */
1845 offset = pg[i]->off & ~CFS_PAGE_MASK;
1846 if (offset != 0) /* doesn't start on page boundary */
1853 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1855 struct brw_page **ppga;
1858 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1862 for (i = 0; i < count; i++)
1867 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1869 LASSERT(ppga != NULL);
1870 OBD_FREE(ppga, sizeof(*ppga) * count);
1873 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1874 obd_count page_count, struct brw_page *pga,
1875 struct obd_trans_info *oti)
1877 struct obdo *saved_oa = NULL;
1878 struct brw_page **ppga, **orig;
1879 struct obd_import *imp = class_exp2cliimp(exp);
1880 struct client_obd *cli;
1881 int rc, page_count_orig;
1884 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1885 cli = &imp->imp_obd->u.cli;
1887 if (cmd & OBD_BRW_CHECK) {
1888 /* The caller just wants to know if there's a chance that this
1889 * I/O can succeed */
1891 if (imp->imp_invalid)
1896 /* test_brw with a failed create can trip this, maybe others. */
1897 LASSERT(cli->cl_max_pages_per_rpc);
1901 orig = ppga = osc_build_ppga(pga, page_count);
1904 page_count_orig = page_count;
1906 sort_brw_pages(ppga, page_count);
1907 while (page_count) {
1908 obd_count pages_per_brw;
1910 if (page_count > cli->cl_max_pages_per_rpc)
1911 pages_per_brw = cli->cl_max_pages_per_rpc;
1913 pages_per_brw = page_count;
1915 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1917 if (saved_oa != NULL) {
1918 /* restore previously saved oa */
1919 *oinfo->oi_oa = *saved_oa;
1920 } else if (page_count > pages_per_brw) {
1921 /* save a copy of oa (brw will clobber it) */
1922 OBDO_ALLOC(saved_oa);
1923 if (saved_oa == NULL)
1924 GOTO(out, rc = -ENOMEM);
1925 *saved_oa = *oinfo->oi_oa;
1928 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1929 pages_per_brw, ppga, oinfo->oi_capa);
1934 page_count -= pages_per_brw;
1935 ppga += pages_per_brw;
1939 osc_release_ppga(orig, page_count_orig);
1941 if (saved_oa != NULL)
1942 OBDO_FREE(saved_oa);
1947 static int brw_interpret(const struct lu_env *env,
1948 struct ptlrpc_request *req, void *data, int rc)
1950 struct osc_brw_async_args *aa = data;
1951 struct osc_extent *ext;
1952 struct osc_extent *tmp;
1953 struct cl_object *obj = NULL;
1954 struct client_obd *cli = aa->aa_cli;
1957 rc = osc_brw_fini_request(req, rc);
1958 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1959 /* When server return -EINPROGRESS, client should always retry
1960 * regardless of the number of times the bulk was resent already. */
1961 if (osc_recoverable_error(rc)) {
1962 if (req->rq_import_generation !=
1963 req->rq_import->imp_generation) {
1964 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1965 ""LPU64":"LPU64", rc = %d.\n",
1966 req->rq_import->imp_obd->obd_name,
1967 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1968 } else if (rc == -EINPROGRESS ||
1969 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1970 rc = osc_brw_redo_request(req, aa, rc);
1972 CERROR("%s: too many resent retries for object: "
1973 ""LPU64":"LPU64", rc = %d.\n",
1974 req->rq_import->imp_obd->obd_name,
1975 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1980 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1985 capa_put(aa->aa_ocapa);
1986 aa->aa_ocapa = NULL;
1989 cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1990 if (obj == NULL && rc == 0) {
1991 obj = osc2cl(ext->oe_obj);
1995 cfs_list_del_init(&ext->oe_link);
1996 osc_extent_finish(env, ext, 1, rc);
1998 LASSERT(cfs_list_empty(&aa->aa_exts));
1999 LASSERT(cfs_list_empty(&aa->aa_oaps));
2002 struct obdo *oa = aa->aa_oa;
2003 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2004 unsigned long valid = 0;
2007 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2008 attr->cat_blocks = oa->o_blocks;
2009 valid |= CAT_BLOCKS;
2011 if (oa->o_valid & OBD_MD_FLMTIME) {
2012 attr->cat_mtime = oa->o_mtime;
2015 if (oa->o_valid & OBD_MD_FLATIME) {
2016 attr->cat_atime = oa->o_atime;
2019 if (oa->o_valid & OBD_MD_FLCTIME) {
2020 attr->cat_ctime = oa->o_ctime;
2024 cl_object_attr_lock(obj);
2025 cl_object_attr_set(env, obj, attr, valid);
2026 cl_object_attr_unlock(obj);
2028 cl_object_put(env, obj);
2030 OBDO_FREE(aa->aa_oa);
2032 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2033 req->rq_bulk->bd_nob_transferred);
2034 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2035 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2037 client_obd_list_lock(&cli->cl_loi_list_lock);
2038 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2039 * is called so we know whether to go to sync BRWs or wait for more
2040 * RPCs to complete */
2041 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2042 cli->cl_w_in_flight--;
2044 cli->cl_r_in_flight--;
2045 osc_wake_cache_waiters(cli);
2046 client_obd_list_unlock(&cli->cl_loi_list_lock);
2048 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2053 * Build an RPC by the list of extent @ext_list. The caller must ensure
2054 * that the total pages in this list are NOT over max pages per RPC.
2055 * Extents in the list must be in OES_RPC state.
2057 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2058 cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2060 struct ptlrpc_request *req = NULL;
2061 struct osc_extent *ext;
2062 CFS_LIST_HEAD(rpc_list);
2063 struct brw_page **pga = NULL;
2064 struct osc_brw_async_args *aa = NULL;
2065 struct obdo *oa = NULL;
2066 struct osc_async_page *oap;
2067 struct osc_async_page *tmp;
2068 struct cl_req *clerq = NULL;
2069 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2070 struct ldlm_lock *lock = NULL;
2071 struct cl_req_attr crattr;
2072 obd_off starting_offset = OBD_OBJECT_EOF;
2073 obd_off ending_offset = 0;
2074 int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2077 LASSERT(!cfs_list_empty(ext_list));
2079 /* add pages into rpc_list to build BRW rpc */
2080 cfs_list_for_each_entry(ext, ext_list, oe_link) {
2081 LASSERT(ext->oe_state == OES_RPC);
2082 mem_tight |= ext->oe_memalloc;
2083 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2085 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2086 if (starting_offset > oap->oap_obj_off)
2087 starting_offset = oap->oap_obj_off;
2089 LASSERT(oap->oap_page_off == 0);
2090 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2091 ending_offset = oap->oap_obj_off +
2094 LASSERT(oap->oap_page_off + oap->oap_count ==
2100 mpflag = cfs_memory_pressure_get_and_set();
2102 memset(&crattr, 0, sizeof crattr);
2103 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2105 GOTO(out, rc = -ENOMEM);
2109 GOTO(out, rc = -ENOMEM);
2112 cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2113 struct cl_page *page = oap2cl_page(oap);
2114 if (clerq == NULL) {
2115 clerq = cl_req_alloc(env, page, crt,
2116 1 /* only 1-object rpcs for
2119 GOTO(out, rc = PTR_ERR(clerq));
2120 lock = oap->oap_ldlm_lock;
2123 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2124 pga[i] = &oap->oap_brw_page;
2125 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2126 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2127 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2129 cl_req_page_add(env, clerq, page);
2132 /* always get the data for the obdo for the rpc */
2133 LASSERT(clerq != NULL);
2135 crattr.cra_capa = NULL;
2136 memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2137 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2139 oa->o_handle = lock->l_remote_handle;
2140 oa->o_valid |= OBD_MD_FLHANDLE;
2143 rc = cl_req_prep(env, clerq);
2145 CERROR("cl_req_prep failed: %d\n", rc);
2149 sort_brw_pages(pga, page_count);
2150 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2151 pga, &req, crattr.cra_capa, 1, 0);
2153 CERROR("prep_req failed: %d\n", rc);
2157 req->rq_interpret_reply = brw_interpret;
2159 req->rq_memalloc = 1;
2161 /* Need to update the timestamps after the request is built in case
2162 * we race with setattr (locally or in queue at OST). If OST gets
2163 * later setattr before earlier BRW (as determined by the request xid),
2164 * the OST will not use BRW timestamps. Sadly, there is no obvious
2165 * way to do this in a single call. bug 10150 */
2166 cl_req_attr_set(env, clerq, &crattr,
2167 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2169 lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2171 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2172 aa = ptlrpc_req_async_args(req);
2173 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2174 cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2175 CFS_INIT_LIST_HEAD(&aa->aa_exts);
2176 cfs_list_splice_init(ext_list, &aa->aa_exts);
2177 aa->aa_clerq = clerq;
2179 /* queued sync pages can be torn down while the pages
2180 * were between the pending list and the rpc */
2182 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2183 /* only one oap gets a request reference */
2186 if (oap->oap_interrupted && !req->rq_intr) {
2187 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2189 ptlrpc_mark_interrupted(req);
2193 tmp->oap_request = ptlrpc_request_addref(req);
2195 client_obd_list_lock(&cli->cl_loi_list_lock);
2196 starting_offset >>= CFS_PAGE_SHIFT;
2197 if (cmd == OBD_BRW_READ) {
2198 cli->cl_r_in_flight++;
2199 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2200 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2201 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2202 starting_offset + 1);
2204 cli->cl_w_in_flight++;
2205 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2206 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2207 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2208 starting_offset + 1);
2210 client_obd_list_unlock(&cli->cl_loi_list_lock);
2212 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2213 page_count, aa, cli->cl_r_in_flight,
2214 cli->cl_w_in_flight);
2216 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2217 * see which CPU/NUMA node the majority of pages were allocated
2218 * on, and try to assign the async RPC to the CPU core
2219 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2221 * But on the other hand, we expect that multiple ptlrpcd
2222 * threads and the initial write sponsor can run in parallel,
2223 * especially when data checksum is enabled, which is CPU-bound
2224 * operation and single ptlrpcd thread cannot process in time.
2225 * So more ptlrpcd threads sharing BRW load
2226 * (with PDL_POLICY_ROUND) seems better.
2228 ptlrpcd_add_req(req, pol, -1);
2234 cfs_memory_pressure_restore(mpflag);
2236 capa_put(crattr.cra_capa);
2238 LASSERT(req == NULL);
2243 OBD_FREE(pga, sizeof(*pga) * page_count);
2244 /* this should happen rarely and is pretty bad, it makes the
2245 * pending list not follow the dirty order */
2246 while (!cfs_list_empty(ext_list)) {
2247 ext = cfs_list_entry(ext_list->next, struct osc_extent,
2249 cfs_list_del_init(&ext->oe_link);
2250 osc_extent_finish(env, ext, 0, rc);
2252 if (clerq && !IS_ERR(clerq))
2253 cl_req_completion(env, clerq, rc);
2258 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2259 struct ldlm_enqueue_info *einfo)
2261 void *data = einfo->ei_cbdata;
2264 LASSERT(lock != NULL);
2265 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2266 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2267 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2268 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2270 lock_res_and_lock(lock);
2271 spin_lock(&osc_ast_guard);
2273 if (lock->l_ast_data == NULL)
2274 lock->l_ast_data = data;
2275 if (lock->l_ast_data == data)
2278 spin_unlock(&osc_ast_guard);
2279 unlock_res_and_lock(lock);
2284 static int osc_set_data_with_check(struct lustre_handle *lockh,
2285 struct ldlm_enqueue_info *einfo)
2287 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2291 set = osc_set_lock_data_with_check(lock, einfo);
2292 LDLM_LOCK_PUT(lock);
2294 CERROR("lockh %p, data %p - client evicted?\n",
2295 lockh, einfo->ei_cbdata);
2299 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2300 ldlm_iterator_t replace, void *data)
2302 struct ldlm_res_id res_id;
2303 struct obd_device *obd = class_exp2obd(exp);
2305 ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2306 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2310 /* find any ldlm lock of the inode in osc
2314 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2315 ldlm_iterator_t replace, void *data)
2317 struct ldlm_res_id res_id;
2318 struct obd_device *obd = class_exp2obd(exp);
2321 ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2322 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2323 if (rc == LDLM_ITER_STOP)
2325 if (rc == LDLM_ITER_CONTINUE)
2330 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2331 obd_enqueue_update_f upcall, void *cookie,
2332 __u64 *flags, int agl, int rc)
2334 int intent = *flags & LDLM_FL_HAS_INTENT;
2338 /* The request was created before ldlm_cli_enqueue call. */
2339 if (rc == ELDLM_LOCK_ABORTED) {
2340 struct ldlm_reply *rep;
2341 rep = req_capsule_server_get(&req->rq_pill,
2344 LASSERT(rep != NULL);
2345 if (rep->lock_policy_res1)
2346 rc = rep->lock_policy_res1;
2350 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2352 *flags |= LDLM_FL_LVB_READY;
2353 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2354 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2357 /* Call the update callback. */
2358 rc = (*upcall)(cookie, rc);
2362 static int osc_enqueue_interpret(const struct lu_env *env,
2363 struct ptlrpc_request *req,
2364 struct osc_enqueue_args *aa, int rc)
2366 struct ldlm_lock *lock;
2367 struct lustre_handle handle;
2369 struct ost_lvb *lvb;
2371 __u64 *flags = aa->oa_flags;
2373 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2374 * might be freed anytime after lock upcall has been called. */
2375 lustre_handle_copy(&handle, aa->oa_lockh);
2376 mode = aa->oa_ei->ei_mode;
2378 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2380 lock = ldlm_handle2lock(&handle);
2382 /* Take an additional reference so that a blocking AST that
2383 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2384 * to arrive after an upcall has been executed by
2385 * osc_enqueue_fini(). */
2386 ldlm_lock_addref(&handle, mode);
2388 /* Let CP AST to grant the lock first. */
2389 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2391 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2396 lvb_len = sizeof(*aa->oa_lvb);
2399 /* Complete obtaining the lock procedure. */
2400 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2401 mode, flags, lvb, lvb_len, &handle, rc);
2402 /* Complete osc stuff. */
2403 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2404 flags, aa->oa_agl, rc);
2406 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2408 /* Release the lock for async request. */
2409 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2411 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2412 * not already released by
2413 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2415 ldlm_lock_decref(&handle, mode);
2417 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2418 aa->oa_lockh, req, aa);
2419 ldlm_lock_decref(&handle, mode);
2420 LDLM_LOCK_PUT(lock);
2424 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2425 struct lov_oinfo *loi, int flags,
2426 struct ost_lvb *lvb, __u32 mode, int rc)
2428 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2430 if (rc == ELDLM_OK) {
2433 LASSERT(lock != NULL);
2434 loi->loi_lvb = *lvb;
2435 tmp = loi->loi_lvb.lvb_size;
2436 /* Extend KMS up to the end of this lock and no further
2437 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2438 if (tmp > lock->l_policy_data.l_extent.end)
2439 tmp = lock->l_policy_data.l_extent.end + 1;
2440 if (tmp >= loi->loi_kms) {
2441 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2442 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2443 loi_kms_set(loi, tmp);
2445 LDLM_DEBUG(lock, "lock acquired, setting rss="
2446 LPU64"; leaving kms="LPU64", end="LPU64,
2447 loi->loi_lvb.lvb_size, loi->loi_kms,
2448 lock->l_policy_data.l_extent.end);
2450 ldlm_lock_allow_match(lock);
2451 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2452 LASSERT(lock != NULL);
2453 loi->loi_lvb = *lvb;
2454 ldlm_lock_allow_match(lock);
2455 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2456 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2462 ldlm_lock_fail_match(lock);
2464 LDLM_LOCK_PUT(lock);
2467 EXPORT_SYMBOL(osc_update_enqueue);
2469 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2471 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2472 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2473 * other synchronous requests, however keeping some locks and trying to obtain
2474 * others may take a considerable amount of time in a case of ost failure; and
2475 * when other sync requests do not get released lock from a client, the client
2476 * is excluded from the cluster -- such scenarious make the life difficult, so
2477 * release locks just after they are obtained. */
2478 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2479 __u64 *flags, ldlm_policy_data_t *policy,
2480 struct ost_lvb *lvb, int kms_valid,
2481 obd_enqueue_update_f upcall, void *cookie,
2482 struct ldlm_enqueue_info *einfo,
2483 struct lustre_handle *lockh,
2484 struct ptlrpc_request_set *rqset, int async, int agl)
2486 struct obd_device *obd = exp->exp_obd;
2487 struct ptlrpc_request *req = NULL;
2488 int intent = *flags & LDLM_FL_HAS_INTENT;
2489 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2494 /* Filesystem lock extents are extended to page boundaries so that
2495 * dealing with the page cache is a little smoother. */
2496 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2497 policy->l_extent.end |= ~CFS_PAGE_MASK;
2500 * kms is not valid when either object is completely fresh (so that no
2501 * locks are cached), or object was evicted. In the latter case cached
2502 * lock cannot be used, because it would prime inode state with
2503 * potentially stale LVB.
2508 /* Next, search for already existing extent locks that will cover us */
2509 /* If we're trying to read, we also search for an existing PW lock. The
2510 * VFS and page cache already protect us locally, so lots of readers/
2511 * writers can share a single PW lock.
2513 * There are problems with conversion deadlocks, so instead of
2514 * converting a read lock to a write lock, we'll just enqueue a new
2517 * At some point we should cancel the read lock instead of making them
2518 * send us a blocking callback, but there are problems with canceling
2519 * locks out from other users right now, too. */
2520 mode = einfo->ei_mode;
2521 if (einfo->ei_mode == LCK_PR)
2523 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2524 einfo->ei_type, policy, mode, lockh, 0);
2526 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2528 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2529 /* For AGL, if enqueue RPC is sent but the lock is not
2530 * granted, then skip to process this strpe.
2531 * Return -ECANCELED to tell the caller. */
2532 ldlm_lock_decref(lockh, mode);
2533 LDLM_LOCK_PUT(matched);
2535 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2536 *flags |= LDLM_FL_LVB_READY;
2537 /* addref the lock only if not async requests and PW
2538 * lock is matched whereas we asked for PR. */
2539 if (!rqset && einfo->ei_mode != mode)
2540 ldlm_lock_addref(lockh, LCK_PR);
2542 /* I would like to be able to ASSERT here that
2543 * rss <= kms, but I can't, for reasons which
2544 * are explained in lov_enqueue() */
2547 /* We already have a lock, and it's referenced.
2549 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2550 * AGL upcall may change it to CLS_HELD directly. */
2551 (*upcall)(cookie, ELDLM_OK);
2553 if (einfo->ei_mode != mode)
2554 ldlm_lock_decref(lockh, LCK_PW);
2556 /* For async requests, decref the lock. */
2557 ldlm_lock_decref(lockh, einfo->ei_mode);
2558 LDLM_LOCK_PUT(matched);
2561 ldlm_lock_decref(lockh, mode);
2562 LDLM_LOCK_PUT(matched);
2568 CFS_LIST_HEAD(cancels);
2569 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2570 &RQF_LDLM_ENQUEUE_LVB);
2574 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2576 ptlrpc_request_free(req);
2580 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2582 ptlrpc_request_set_replen(req);
2585 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2586 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2588 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2589 sizeof(*lvb), LVB_T_OST, lockh, async);
2592 struct osc_enqueue_args *aa;
2593 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2594 aa = ptlrpc_req_async_args(req);
2597 aa->oa_flags = flags;
2598 aa->oa_upcall = upcall;
2599 aa->oa_cookie = cookie;
2601 aa->oa_lockh = lockh;
2604 req->rq_interpret_reply =
2605 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2606 if (rqset == PTLRPCD_SET)
2607 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2609 ptlrpc_set_add_req(rqset, req);
2610 } else if (intent) {
2611 ptlrpc_req_finished(req);
2616 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2618 ptlrpc_req_finished(req);
2623 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2624 struct ldlm_enqueue_info *einfo,
2625 struct ptlrpc_request_set *rqset)
2627 struct ldlm_res_id res_id;
2631 ostid_build_res_name(&oinfo->oi_md->lsm_object_oid, &res_id);
2632 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2633 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2634 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2635 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2636 rqset, rqset != NULL, 0);
2640 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2641 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2642 int *flags, void *data, struct lustre_handle *lockh,
2645 struct obd_device *obd = exp->exp_obd;
2646 int lflags = *flags;
2650 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2653 /* Filesystem lock extents are extended to page boundaries so that
2654 * dealing with the page cache is a little smoother */
2655 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2656 policy->l_extent.end |= ~CFS_PAGE_MASK;
2658 /* Next, search for already existing extent locks that will cover us */
2659 /* If we're trying to read, we also search for an existing PW lock. The
2660 * VFS and page cache already protect us locally, so lots of readers/
2661 * writers can share a single PW lock. */
2665 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2666 res_id, type, policy, rc, lockh, unref);
2669 if (!osc_set_data_with_check(lockh, data)) {
2670 if (!(lflags & LDLM_FL_TEST_LOCK))
2671 ldlm_lock_decref(lockh, rc);
2675 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2676 ldlm_lock_addref(lockh, LCK_PR);
2677 ldlm_lock_decref(lockh, LCK_PW);
2684 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2688 if (unlikely(mode == LCK_GROUP))
2689 ldlm_lock_decref_and_cancel(lockh, mode);
2691 ldlm_lock_decref(lockh, mode);
2696 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2697 __u32 mode, struct lustre_handle *lockh)
2700 RETURN(osc_cancel_base(lockh, mode));
2703 static int osc_cancel_unused(struct obd_export *exp,
2704 struct lov_stripe_md *lsm,
2705 ldlm_cancel_flags_t flags,
2708 struct obd_device *obd = class_exp2obd(exp);
2709 struct ldlm_res_id res_id, *resp = NULL;
2712 ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2716 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2719 static int osc_statfs_interpret(const struct lu_env *env,
2720 struct ptlrpc_request *req,
2721 struct osc_async_args *aa, int rc)
2723 struct obd_statfs *msfs;
2727 /* The request has in fact never been sent
2728 * due to issues at a higher level (LOV).
2729 * Exit immediately since the caller is
2730 * aware of the problem and takes care
2731 * of the clean up */
2734 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2735 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2741 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2743 GOTO(out, rc = -EPROTO);
2746 *aa->aa_oi->oi_osfs = *msfs;
2748 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2752 static int osc_statfs_async(struct obd_export *exp,
2753 struct obd_info *oinfo, __u64 max_age,
2754 struct ptlrpc_request_set *rqset)
2756 struct obd_device *obd = class_exp2obd(exp);
2757 struct ptlrpc_request *req;
2758 struct osc_async_args *aa;
2762 /* We could possibly pass max_age in the request (as an absolute
2763 * timestamp or a "seconds.usec ago") so the target can avoid doing
2764 * extra calls into the filesystem if that isn't necessary (e.g.
2765 * during mount that would help a bit). Having relative timestamps
2766 * is not so great if request processing is slow, while absolute
2767 * timestamps are not ideal because they need time synchronization. */
2768 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2772 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2774 ptlrpc_request_free(req);
2777 ptlrpc_request_set_replen(req);
2778 req->rq_request_portal = OST_CREATE_PORTAL;
2779 ptlrpc_at_set_req_timeout(req);
2781 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2782 /* procfs requests not want stat in wait for avoid deadlock */
2783 req->rq_no_resend = 1;
2784 req->rq_no_delay = 1;
2787 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2788 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2789 aa = ptlrpc_req_async_args(req);
2792 ptlrpc_set_add_req(rqset, req);
2796 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2797 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2799 struct obd_device *obd = class_exp2obd(exp);
2800 struct obd_statfs *msfs;
2801 struct ptlrpc_request *req;
2802 struct obd_import *imp = NULL;
2806 /*Since the request might also come from lprocfs, so we need
2807 *sync this with client_disconnect_export Bug15684*/
2808 down_read(&obd->u.cli.cl_sem);
2809 if (obd->u.cli.cl_import)
2810 imp = class_import_get(obd->u.cli.cl_import);
2811 up_read(&obd->u.cli.cl_sem);
2815 /* We could possibly pass max_age in the request (as an absolute
2816 * timestamp or a "seconds.usec ago") so the target can avoid doing
2817 * extra calls into the filesystem if that isn't necessary (e.g.
2818 * during mount that would help a bit). Having relative timestamps
2819 * is not so great if request processing is slow, while absolute
2820 * timestamps are not ideal because they need time synchronization. */
2821 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2823 class_import_put(imp);
2828 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2830 ptlrpc_request_free(req);
2833 ptlrpc_request_set_replen(req);
2834 req->rq_request_portal = OST_CREATE_PORTAL;
2835 ptlrpc_at_set_req_timeout(req);
2837 if (flags & OBD_STATFS_NODELAY) {
2838 /* procfs requests not want stat in wait for avoid deadlock */
2839 req->rq_no_resend = 1;
2840 req->rq_no_delay = 1;
2843 rc = ptlrpc_queue_wait(req);
2847 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2849 GOTO(out, rc = -EPROTO);
2856 ptlrpc_req_finished(req);
2860 /* Retrieve object striping information.
2862 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2863 * the maximum number of OST indices which will fit in the user buffer.
2864 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2866 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2868 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2869 struct lov_user_md_v3 lum, *lumk;
2870 struct lov_user_ost_data_v1 *lmm_objects;
2871 int rc = 0, lum_size;
2877 /* we only need the header part from user space to get lmm_magic and
2878 * lmm_stripe_count, (the header part is common to v1 and v3) */
2879 lum_size = sizeof(struct lov_user_md_v1);
2880 if (cfs_copy_from_user(&lum, lump, lum_size))
2883 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2884 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2887 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2888 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2889 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2890 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2892 /* we can use lov_mds_md_size() to compute lum_size
2893 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2894 if (lum.lmm_stripe_count > 0) {
2895 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2896 OBD_ALLOC(lumk, lum_size);
2900 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2901 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2903 lmm_objects = &(lumk->lmm_objects[0]);
2904 lmm_objects->l_object_id = lsm->lsm_object_id;
2906 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2910 lumk->lmm_object_id = lsm->lsm_object_id;
2911 lumk->lmm_object_seq = lsm->lsm_object_seq;
2912 lumk->lmm_stripe_count = 1;
2914 if (cfs_copy_to_user(lump, lumk, lum_size))
2918 OBD_FREE(lumk, lum_size);
2924 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2925 void *karg, void *uarg)
2927 struct obd_device *obd = exp->exp_obd;
2928 struct obd_ioctl_data *data = karg;
2932 if (!cfs_try_module_get(THIS_MODULE)) {
2933 CERROR("Can't get module. Is it alive?");
2937 case OBD_IOC_LOV_GET_CONFIG: {
2939 struct lov_desc *desc;
2940 struct obd_uuid uuid;
2944 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2945 GOTO(out, err = -EINVAL);
2947 data = (struct obd_ioctl_data *)buf;
2949 if (sizeof(*desc) > data->ioc_inllen1) {
2950 obd_ioctl_freedata(buf, len);
2951 GOTO(out, err = -EINVAL);
2954 if (data->ioc_inllen2 < sizeof(uuid)) {
2955 obd_ioctl_freedata(buf, len);
2956 GOTO(out, err = -EINVAL);
2959 desc = (struct lov_desc *)data->ioc_inlbuf1;
2960 desc->ld_tgt_count = 1;
2961 desc->ld_active_tgt_count = 1;
2962 desc->ld_default_stripe_count = 1;
2963 desc->ld_default_stripe_size = 0;
2964 desc->ld_default_stripe_offset = 0;
2965 desc->ld_pattern = 0;
2966 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2968 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2970 err = cfs_copy_to_user((void *)uarg, buf, len);
2973 obd_ioctl_freedata(buf, len);
2976 case LL_IOC_LOV_SETSTRIPE:
2977 err = obd_alloc_memmd(exp, karg);
2981 case LL_IOC_LOV_GETSTRIPE:
2982 err = osc_getstripe(karg, uarg);
2984 case OBD_IOC_CLIENT_RECOVER:
2985 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2986 data->ioc_inlbuf1, 0);
2990 case IOC_OSC_SET_ACTIVE:
2991 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2994 case OBD_IOC_POLL_QUOTACHECK:
2995 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2997 case OBD_IOC_PING_TARGET:
2998 err = ptlrpc_obd_ping(obd);
3001 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3002 cmd, cfs_curproc_comm());
3003 GOTO(out, err = -ENOTTY);
3006 cfs_module_put(THIS_MODULE);
3010 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3011 obd_count keylen, void *key, __u32 *vallen, void *val,
3012 struct lov_stripe_md *lsm)
3015 if (!vallen || !val)
3018 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3019 __u32 *stripe = val;
3020 *vallen = sizeof(*stripe);
3023 } else if (KEY_IS(KEY_LAST_ID)) {
3024 struct ptlrpc_request *req;
3029 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3030 &RQF_OST_GET_INFO_LAST_ID);
3034 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3035 RCL_CLIENT, keylen);
3036 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3038 ptlrpc_request_free(req);
3042 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3043 memcpy(tmp, key, keylen);
3045 req->rq_no_delay = req->rq_no_resend = 1;
3046 ptlrpc_request_set_replen(req);
3047 rc = ptlrpc_queue_wait(req);
3051 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3053 GOTO(out, rc = -EPROTO);
3055 *((obd_id *)val) = *reply;
3057 ptlrpc_req_finished(req);
3059 } else if (KEY_IS(KEY_FIEMAP)) {
3060 struct ptlrpc_request *req;
3061 struct ll_user_fiemap *reply;
3065 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3066 &RQF_OST_GET_INFO_FIEMAP);
3070 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3071 RCL_CLIENT, keylen);
3072 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3073 RCL_CLIENT, *vallen);
3074 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3075 RCL_SERVER, *vallen);
3077 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3079 ptlrpc_request_free(req);
3083 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3084 memcpy(tmp, key, keylen);
3085 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3086 memcpy(tmp, val, *vallen);
3088 ptlrpc_request_set_replen(req);
3089 rc = ptlrpc_queue_wait(req);
3093 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3095 GOTO(out1, rc = -EPROTO);
3097 memcpy(val, reply, *vallen);
3099 ptlrpc_req_finished(req);
3107 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3108 obd_count keylen, void *key, obd_count vallen,
3109 void *val, struct ptlrpc_request_set *set)
3111 struct ptlrpc_request *req;
3112 struct obd_device *obd = exp->exp_obd;
3113 struct obd_import *imp = class_exp2cliimp(exp);
3118 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3120 if (KEY_IS(KEY_CHECKSUM)) {
3121 if (vallen != sizeof(int))
3123 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3127 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3128 sptlrpc_conf_client_adapt(obd);
3132 if (KEY_IS(KEY_FLUSH_CTX)) {
3133 sptlrpc_import_flush_my_ctx(imp);
3137 if (KEY_IS(KEY_CACHE_SET)) {
3138 struct client_obd *cli = &obd->u.cli;
3140 LASSERT(cli->cl_cache == NULL); /* only once */
3141 cli->cl_cache = (struct cl_client_cache *)val;
3142 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3143 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3145 /* add this osc into entity list */
3146 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3147 spin_lock(&cli->cl_cache->ccc_lru_lock);
3148 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3149 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3154 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3155 struct client_obd *cli = &obd->u.cli;
3156 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3157 int target = *(int *)val;
3159 nr = osc_lru_shrink(cli, min(nr, target));
3164 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3167 /* We pass all other commands directly to OST. Since nobody calls osc
3168 methods directly and everybody is supposed to go through LOV, we
3169 assume lov checked invalid values for us.
3170 The only recognised values so far are evict_by_nid and mds_conn.
3171 Even if something bad goes through, we'd get a -EINVAL from OST
3174 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3175 &RQF_OST_SET_GRANT_INFO :
3180 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3181 RCL_CLIENT, keylen);
3182 if (!KEY_IS(KEY_GRANT_SHRINK))
3183 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3184 RCL_CLIENT, vallen);
3185 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3187 ptlrpc_request_free(req);
3191 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3192 memcpy(tmp, key, keylen);
3193 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3196 memcpy(tmp, val, vallen);
3198 if (KEY_IS(KEY_GRANT_SHRINK)) {
3199 struct osc_grant_args *aa;
3202 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3203 aa = ptlrpc_req_async_args(req);
3206 ptlrpc_req_finished(req);
3209 *oa = ((struct ost_body *)val)->oa;
3211 req->rq_interpret_reply = osc_shrink_grant_interpret;
3214 ptlrpc_request_set_replen(req);
3215 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3216 LASSERT(set != NULL);
3217 ptlrpc_set_add_req(set, req);
3218 ptlrpc_check_set(NULL, set);
3220 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3226 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3227 struct obd_device *disk_obd, int *index)
3229 /* this code is not supposed to be used with LOD/OSP
3230 * to be removed soon */
3235 static int osc_llog_finish(struct obd_device *obd, int count)
3237 struct llog_ctxt *ctxt;
3241 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3243 llog_cat_close(NULL, ctxt->loc_handle);
3244 llog_cleanup(NULL, ctxt);
3247 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3249 llog_cleanup(NULL, ctxt);
3253 static int osc_reconnect(const struct lu_env *env,
3254 struct obd_export *exp, struct obd_device *obd,
3255 struct obd_uuid *cluuid,
3256 struct obd_connect_data *data,
3259 struct client_obd *cli = &obd->u.cli;
3261 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3264 client_obd_list_lock(&cli->cl_loi_list_lock);
3265 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3266 2 * cli_brw_size(obd);
3267 lost_grant = cli->cl_lost_grant;
3268 cli->cl_lost_grant = 0;
3269 client_obd_list_unlock(&cli->cl_loi_list_lock);
3271 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3272 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3273 data->ocd_version, data->ocd_grant, lost_grant);
3279 static int osc_disconnect(struct obd_export *exp)
3281 struct obd_device *obd = class_exp2obd(exp);
3282 struct llog_ctxt *ctxt;
3285 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3287 if (obd->u.cli.cl_conn_count == 1) {
3288 /* Flush any remaining cancel messages out to the
3290 llog_sync(ctxt, exp, 0);
3292 llog_ctxt_put(ctxt);
3294 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3298 rc = client_disconnect_export(exp);
3300 * Initially we put del_shrink_grant before disconnect_export, but it
3301 * causes the following problem if setup (connect) and cleanup
3302 * (disconnect) are tangled together.
3303 * connect p1 disconnect p2
3304 * ptlrpc_connect_import
3305 * ............... class_manual_cleanup
3308 * ptlrpc_connect_interrupt
3310 * add this client to shrink list
3312 * Bang! pinger trigger the shrink.
3313 * So the osc should be disconnected from the shrink list, after we
3314 * are sure the import has been destroyed. BUG18662
3316 if (obd->u.cli.cl_import == NULL)
3317 osc_del_shrink_grant(&obd->u.cli);
3321 static int osc_import_event(struct obd_device *obd,
3322 struct obd_import *imp,
3323 enum obd_import_event event)
3325 struct client_obd *cli;
3329 LASSERT(imp->imp_obd == obd);
3332 case IMP_EVENT_DISCON: {
3334 client_obd_list_lock(&cli->cl_loi_list_lock);
3335 cli->cl_avail_grant = 0;
3336 cli->cl_lost_grant = 0;
3337 client_obd_list_unlock(&cli->cl_loi_list_lock);
3340 case IMP_EVENT_INACTIVE: {
3341 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3344 case IMP_EVENT_INVALIDATE: {
3345 struct ldlm_namespace *ns = obd->obd_namespace;
3349 env = cl_env_get(&refcheck);
3353 /* all pages go to failing rpcs due to the invalid
3355 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3357 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3358 cl_env_put(env, &refcheck);
3363 case IMP_EVENT_ACTIVE: {
3364 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3367 case IMP_EVENT_OCD: {
3368 struct obd_connect_data *ocd = &imp->imp_connect_data;
3370 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3371 osc_init_grant(&obd->u.cli, ocd);
3374 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3375 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3377 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3380 case IMP_EVENT_DEACTIVATE: {
3381 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3384 case IMP_EVENT_ACTIVATE: {
3385 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3389 CERROR("Unknown import event %d\n", event);
3396 * Determine whether the lock can be canceled before replaying the lock
3397 * during recovery, see bug16774 for detailed information.
3399 * \retval zero the lock can't be canceled
3400 * \retval other ok to cancel
3402 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3404 check_res_locked(lock->l_resource);
3407 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3409 * XXX as a future improvement, we can also cancel unused write lock
3410 * if it doesn't have dirty data and active mmaps.
3412 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3413 (lock->l_granted_mode == LCK_PR ||
3414 lock->l_granted_mode == LCK_CR) &&
3415 (osc_dlm_lock_pageref(lock) == 0))
3421 static int brw_queue_work(const struct lu_env *env, void *data)
3423 struct client_obd *cli = data;
3425 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3427 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3431 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3433 struct lprocfs_static_vars lvars = { 0 };
3434 struct client_obd *cli = &obd->u.cli;
3439 rc = ptlrpcd_addref();
3443 rc = client_obd_setup(obd, lcfg);
3445 GOTO(out_ptlrpcd, rc);
3447 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3448 if (IS_ERR(handler))
3449 GOTO(out_client_setup, rc = PTR_ERR(handler));
3450 cli->cl_writeback_work = handler;
3452 rc = osc_quota_setup(obd);
3454 GOTO(out_ptlrpcd_work, rc);
3456 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3457 lprocfs_osc_init_vars(&lvars);
3458 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3459 lproc_osc_attach_seqstat(obd);
3460 sptlrpc_lprocfs_cliobd_attach(obd);
3461 ptlrpc_lprocfs_register_obd(obd);
3464 /* We need to allocate a few requests more, because
3465 * brw_interpret tries to create new requests before freeing
3466 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3467 * reserved, but I'm afraid that might be too much wasted RAM
3468 * in fact, so 2 is just my guess and still should work. */
3469 cli->cl_import->imp_rq_pool =
3470 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3472 ptlrpc_add_rqs_to_pool);
3474 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3475 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3479 ptlrpcd_destroy_work(handler);
3481 client_obd_cleanup(obd);
3487 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3493 case OBD_CLEANUP_EARLY: {
3494 struct obd_import *imp;
3495 imp = obd->u.cli.cl_import;
3496 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3497 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3498 ptlrpc_deactivate_import(imp);
3499 spin_lock(&imp->imp_lock);
3500 imp->imp_pingable = 0;
3501 spin_unlock(&imp->imp_lock);
3504 case OBD_CLEANUP_EXPORTS: {
3505 struct client_obd *cli = &obd->u.cli;
3507 * for echo client, export may be on zombie list, wait for
3508 * zombie thread to cull it, because cli.cl_import will be
3509 * cleared in client_disconnect_export():
3510 * class_export_destroy() -> obd_cleanup() ->
3511 * echo_device_free() -> echo_client_cleanup() ->
3512 * obd_disconnect() -> osc_disconnect() ->
3513 * client_disconnect_export()
3515 obd_zombie_barrier();
3516 if (cli->cl_writeback_work) {
3517 ptlrpcd_destroy_work(cli->cl_writeback_work);
3518 cli->cl_writeback_work = NULL;
3520 obd_cleanup_client_import(obd);
3521 ptlrpc_lprocfs_unregister_obd(obd);
3522 lprocfs_obd_cleanup(obd);
3523 rc = obd_llog_finish(obd, 0);
3525 CERROR("failed to cleanup llogging subsystems\n");
3532 int osc_cleanup(struct obd_device *obd)
3534 struct client_obd *cli = &obd->u.cli;
3540 if (cli->cl_cache != NULL) {
3541 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3542 spin_lock(&cli->cl_cache->ccc_lru_lock);
3543 cfs_list_del_init(&cli->cl_lru_osc);
3544 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3545 cli->cl_lru_left = NULL;
3546 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3547 cli->cl_cache = NULL;
3550 /* free memory of osc quota cache */
3551 osc_quota_cleanup(obd);
3553 rc = client_obd_cleanup(obd);
3559 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3561 struct lprocfs_static_vars lvars = { 0 };
3564 lprocfs_osc_init_vars(&lvars);
3566 switch (lcfg->lcfg_command) {
3568 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3578 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3580 return osc_process_config_base(obd, buf);
3583 struct obd_ops osc_obd_ops = {
3584 .o_owner = THIS_MODULE,
3585 .o_setup = osc_setup,
3586 .o_precleanup = osc_precleanup,
3587 .o_cleanup = osc_cleanup,
3588 .o_add_conn = client_import_add_conn,
3589 .o_del_conn = client_import_del_conn,
3590 .o_connect = client_connect_import,
3591 .o_reconnect = osc_reconnect,
3592 .o_disconnect = osc_disconnect,
3593 .o_statfs = osc_statfs,
3594 .o_statfs_async = osc_statfs_async,
3595 .o_packmd = osc_packmd,
3596 .o_unpackmd = osc_unpackmd,
3597 .o_create = osc_create,
3598 .o_destroy = osc_destroy,
3599 .o_getattr = osc_getattr,
3600 .o_getattr_async = osc_getattr_async,
3601 .o_setattr = osc_setattr,
3602 .o_setattr_async = osc_setattr_async,
3604 .o_punch = osc_punch,
3606 .o_enqueue = osc_enqueue,
3607 .o_change_cbdata = osc_change_cbdata,
3608 .o_find_cbdata = osc_find_cbdata,
3609 .o_cancel = osc_cancel,
3610 .o_cancel_unused = osc_cancel_unused,
3611 .o_iocontrol = osc_iocontrol,
3612 .o_get_info = osc_get_info,
3613 .o_set_info_async = osc_set_info_async,
3614 .o_import_event = osc_import_event,
3615 .o_llog_init = osc_llog_init,
3616 .o_llog_finish = osc_llog_finish,
3617 .o_process_config = osc_process_config,
3618 .o_quotactl = osc_quotactl,
3619 .o_quotacheck = osc_quotacheck,
3622 extern struct lu_kmem_descr osc_caches[];
3623 extern spinlock_t osc_ast_guard;
3624 extern struct lock_class_key osc_ast_guard_class;
3626 int __init osc_init(void)
3628 struct lprocfs_static_vars lvars = { 0 };
3632 /* print an address of _any_ initialized kernel symbol from this
3633 * module, to allow debugging with gdb that doesn't support data
3634 * symbols from modules.*/
3635 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3637 rc = lu_kmem_init(osc_caches);
3639 lprocfs_osc_init_vars(&lvars);
3641 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3642 LUSTRE_OSC_NAME, &osc_device_type);
3644 lu_kmem_fini(osc_caches);
3648 spin_lock_init(&osc_ast_guard);
3649 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3655 static void /*__exit*/ osc_exit(void)
3657 class_unregister_type(LUSTRE_OSC_NAME);
3658 lu_kmem_fini(osc_caches);
3661 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3662 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3663 MODULE_LICENSE("GPL");
3665 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);