4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 # include <liblustre.h>
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66 struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
107 struct obd_import *imp = class_exp2cliimp(exp);
111 if (lmm_bytes < sizeof (*lmm)) {
112 CERROR("lov_mds_md too small: %d, need %d\n",
113 lmm_bytes, (int)sizeof(*lmm));
116 /* XXX LOV_MAGIC etc check? */
118 if (lmm->lmm_object_id == 0) {
119 CERROR("lov_mds_md: zero lmm_object_id\n");
124 lsm_size = lov_stripe_md_size(1);
128 if (*lsmp != NULL && lmm == NULL) {
129 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130 OBD_FREE(*lsmp, lsm_size);
136 OBD_ALLOC(*lsmp, lsm_size);
139 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141 OBD_FREE(*lsmp, lsm_size);
144 loi_init((*lsmp)->lsm_oinfo[0]);
148 /* XXX zero *lsmp? */
149 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151 LASSERT((*lsmp)->lsm_object_id);
152 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
156 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264 struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308 struct obd_info *oinfo, struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324 ptlrpc_request_free(req);
328 osc_pack_req_body(req, oinfo);
330 ptlrpc_request_set_replen(req);
332 rc = ptlrpc_queue_wait(req);
336 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338 GOTO(out, rc = -EPROTO);
340 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344 ptlrpc_req_finished(req);
348 static int osc_setattr_interpret(const struct lu_env *env,
349 struct ptlrpc_request *req,
350 struct osc_setattr_args *sa, int rc)
352 struct ost_body *body;
358 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360 GOTO(out, rc = -EPROTO);
362 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
364 rc = sa->sa_upcall(sa->sa_cookie, rc);
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 obd_enqueue_update_f upcall, void *cookie,
371 struct ptlrpc_request_set *rqset)
373 struct ptlrpc_request *req;
374 struct osc_setattr_args *sa;
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385 ptlrpc_request_free(req);
389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392 osc_pack_req_body(req, oinfo);
394 ptlrpc_request_set_replen(req);
396 /* do mds to ost setattr asynchronously */
398 /* Do not wait for response. */
399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401 req->rq_interpret_reply =
402 (ptlrpc_interpterer_t)osc_setattr_interpret;
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413 ptlrpc_set_add_req(rqset, req);
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420 struct obd_trans_info *oti,
421 struct ptlrpc_request_set *rqset)
423 return osc_setattr_async_base(exp, oinfo, oti,
424 oinfo->oi_cb_up, oinfo, rqset);
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428 struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 struct ptlrpc_request *req;
431 struct ost_body *body;
432 struct lov_stripe_md *lsm;
441 rc = obd_alloc_memmd(exp, &lsm);
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448 GOTO(out, rc = -ENOMEM);
450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452 ptlrpc_request_free(req);
456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
458 lustre_set_wire_obdo(&body->oa, oa);
460 ptlrpc_request_set_replen(req);
462 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463 oa->o_flags == OBD_FL_DELORPHAN) {
465 "delorphan from OST integration");
466 /* Don't resend the delorphan req */
467 req->rq_no_resend = req->rq_no_delay = 1;
470 rc = ptlrpc_queue_wait(req);
474 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476 GOTO(out_req, rc = -EPROTO);
478 lustre_get_wire_obdo(oa, &body->oa);
480 /* This should really be sent by the OST */
481 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482 oa->o_valid |= OBD_MD_FLBLKSZ;
484 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485 * have valid lsm_oinfo data structs, so don't go touching that.
486 * This needs to be fixed in a big way.
488 lsm->lsm_object_id = oa->o_id;
489 lsm->lsm_object_seq = oa->o_seq;
493 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496 if (!oti->oti_logcookies)
497 oti_alloc_cookies(oti, 1);
498 *oti->oti_logcookies = oa->o_lcookie;
502 CDEBUG(D_HA, "transno: "LPD64"\n",
503 lustre_msg_get_transno(req->rq_repmsg));
505 ptlrpc_req_finished(req);
508 obd_free_memmd(exp, &lsm);
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513 obd_enqueue_update_f upcall, void *cookie,
514 struct ptlrpc_request_set *rqset)
516 struct ptlrpc_request *req;
517 struct osc_setattr_args *sa;
518 struct ost_body *body;
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529 ptlrpc_request_free(req);
532 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533 ptlrpc_at_set_req_timeout(req);
535 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538 osc_pack_capa(req, body, oinfo->oi_capa);
540 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557 struct obd_info *oinfo, struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync_interpret(const struct lu_env *env,
568 struct ptlrpc_request *req,
571 struct osc_fsync_args *fa = arg;
572 struct ost_body *body;
578 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580 CERROR ("can't unpack ost_body\n");
581 GOTO(out, rc = -EPROTO);
584 *fa->fa_oi->oi_oa = body->oa;
586 rc = fa->fa_upcall(fa->fa_cookie, rc);
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591 obd_enqueue_update_f upcall, void *cookie,
592 struct ptlrpc_request_set *rqset)
594 struct ptlrpc_request *req;
595 struct ost_body *body;
596 struct osc_fsync_args *fa;
600 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
604 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
607 ptlrpc_request_free(req);
611 /* overload the size and blocks fields in the oa with start/end */
612 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615 osc_pack_capa(req, body, oinfo->oi_capa);
617 ptlrpc_request_set_replen(req);
618 req->rq_interpret_reply = osc_sync_interpret;
620 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621 fa = ptlrpc_req_async_args(req);
623 fa->fa_upcall = upcall;
624 fa->fa_cookie = cookie;
626 if (rqset == PTLRPCD_SET)
627 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
629 ptlrpc_set_add_req(rqset, req);
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635 struct obd_info *oinfo, obd_size start, obd_size end,
636 struct ptlrpc_request_set *set)
641 CDEBUG(D_INFO, "oa NULL\n");
645 oinfo->oi_oa->o_size = start;
646 oinfo->oi_oa->o_blocks = end;
647 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
649 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
652 /* Find and cancel locally locks matched by @mode in the resource found by
653 * @objid. Found locks are added into @cancel list. Returns the amount of
654 * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
657 ldlm_mode_t mode, int lock_flags)
659 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660 struct ldlm_res_id res_id;
661 struct ldlm_resource *res;
665 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
670 LDLM_RESOURCE_ADDREF(res);
671 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672 lock_flags, 0, NULL);
673 LDLM_RESOURCE_DELREF(res);
674 ldlm_resource_putref(res);
678 static int osc_destroy_interpret(const struct lu_env *env,
679 struct ptlrpc_request *req, void *data,
682 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
684 cfs_atomic_dec(&cli->cl_destroy_in_flight);
685 cfs_waitq_signal(&cli->cl_destroy_waitq);
689 static int osc_can_send_destroy(struct client_obd *cli)
691 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692 cli->cl_max_rpcs_in_flight) {
693 /* The destroy request can be sent */
696 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697 cli->cl_max_rpcs_in_flight) {
699 * The counter has been modified between the two atomic
702 cfs_waitq_signal(&cli->cl_destroy_waitq);
707 /* Destroy requests can be async always on the client, and we don't even really
708 * care about the return code since the client cannot do anything at all about
710 * When the MDS is unlinking a filename, it saves the file objects into a
711 * recovery llog, and these object records are cancelled when the OST reports
712 * they were destroyed and sync'd to disk (i.e. transaction committed).
713 * If the client dies, or the OST is down when the object should be destroyed,
714 * the records are not cancelled, and when the OST reconnects to the MDS next,
715 * it will retrieve the llog unlink logs and then sends the log cancellation
716 * cookies to the MDS after committing destroy transactions. */
717 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
718 struct obdo *oa, struct lov_stripe_md *ea,
719 struct obd_trans_info *oti, struct obd_export *md_export,
722 struct client_obd *cli = &exp->exp_obd->u.cli;
723 struct ptlrpc_request *req;
724 struct ost_body *body;
725 CFS_LIST_HEAD(cancels);
730 CDEBUG(D_INFO, "oa NULL\n");
734 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
735 LDLM_FL_DISCARD_DATA);
737 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
739 ldlm_lock_list_put(&cancels, l_bl_ast, count);
743 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
744 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
747 ptlrpc_request_free(req);
751 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
752 ptlrpc_at_set_req_timeout(req);
754 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
755 oa->o_lcookie = *oti->oti_logcookies;
756 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
758 lustre_set_wire_obdo(&body->oa, oa);
760 osc_pack_capa(req, body, (struct obd_capa *)capa);
761 ptlrpc_request_set_replen(req);
763 /* don't throttle destroy RPCs for the MDT */
764 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
765 req->rq_interpret_reply = osc_destroy_interpret;
766 if (!osc_can_send_destroy(cli)) {
767 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
771 * Wait until the number of on-going destroy RPCs drops
772 * under max_rpc_in_flight
774 l_wait_event_exclusive(cli->cl_destroy_waitq,
775 osc_can_send_destroy(cli), &lwi);
779 /* Do not wait for response */
780 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
784 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
787 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
789 LASSERT(!(oa->o_valid & bits));
792 client_obd_list_lock(&cli->cl_loi_list_lock);
793 oa->o_dirty = cli->cl_dirty;
794 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
795 CERROR("dirty %lu - %lu > dirty_max %lu\n",
796 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
798 } else if (cfs_atomic_read(&obd_dirty_pages) -
799 cfs_atomic_read(&obd_dirty_transit_pages) >
800 obd_max_dirty_pages + 1){
801 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
802 * not covered by a lock thus they may safely race and trip
803 * this CERROR() unless we add in a small fudge factor (+1). */
804 CERROR("dirty %d - %d > system dirty_max %d\n",
805 cfs_atomic_read(&obd_dirty_pages),
806 cfs_atomic_read(&obd_dirty_transit_pages),
807 obd_max_dirty_pages);
809 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
810 CERROR("dirty %lu - dirty_max %lu too big???\n",
811 cli->cl_dirty, cli->cl_dirty_max);
814 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
815 (cli->cl_max_rpcs_in_flight + 1);
816 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
818 oa->o_grant = cli->cl_avail_grant;
819 oa->o_dropped = cli->cl_lost_grant;
820 cli->cl_lost_grant = 0;
821 client_obd_list_unlock(&cli->cl_loi_list_lock);
822 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
823 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
827 void osc_update_next_shrink(struct client_obd *cli)
829 cli->cl_next_shrink_grant =
830 cfs_time_shift(cli->cl_grant_shrink_interval);
831 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
832 cli->cl_next_shrink_grant);
835 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
837 client_obd_list_lock(&cli->cl_loi_list_lock);
838 cli->cl_avail_grant += grant;
839 client_obd_list_unlock(&cli->cl_loi_list_lock);
842 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
844 if (body->oa.o_valid & OBD_MD_FLGRANT) {
845 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
846 __osc_update_grant(cli, body->oa.o_grant);
850 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
851 obd_count keylen, void *key, obd_count vallen,
852 void *val, struct ptlrpc_request_set *set);
854 static int osc_shrink_grant_interpret(const struct lu_env *env,
855 struct ptlrpc_request *req,
858 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
859 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
860 struct ost_body *body;
863 __osc_update_grant(cli, oa->o_grant);
867 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
869 osc_update_grant(cli, body);
875 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
877 client_obd_list_lock(&cli->cl_loi_list_lock);
878 oa->o_grant = cli->cl_avail_grant / 4;
879 cli->cl_avail_grant -= oa->o_grant;
880 client_obd_list_unlock(&cli->cl_loi_list_lock);
881 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
882 oa->o_valid |= OBD_MD_FLFLAGS;
885 oa->o_flags |= OBD_FL_SHRINK_GRANT;
886 osc_update_next_shrink(cli);
889 /* Shrink the current grant, either from some large amount to enough for a
890 * full set of in-flight RPCs, or if we have already shrunk to that limit
891 * then to enough for a single RPC. This avoids keeping more grant than
892 * needed, and avoids shrinking the grant piecemeal. */
893 static int osc_shrink_grant(struct client_obd *cli)
895 long target = (cli->cl_max_rpcs_in_flight + 1) *
896 cli->cl_max_pages_per_rpc;
898 client_obd_list_lock(&cli->cl_loi_list_lock);
899 if (cli->cl_avail_grant <= target)
900 target = cli->cl_max_pages_per_rpc;
901 client_obd_list_unlock(&cli->cl_loi_list_lock);
903 return osc_shrink_grant_to_target(cli, target);
906 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
909 struct ost_body *body;
912 client_obd_list_lock(&cli->cl_loi_list_lock);
913 /* Don't shrink if we are already above or below the desired limit
914 * We don't want to shrink below a single RPC, as that will negatively
915 * impact block allocation and long-term performance. */
916 if (target < cli->cl_max_pages_per_rpc)
917 target = cli->cl_max_pages_per_rpc;
919 if (target >= cli->cl_avail_grant) {
920 client_obd_list_unlock(&cli->cl_loi_list_lock);
923 client_obd_list_unlock(&cli->cl_loi_list_lock);
929 osc_announce_cached(cli, &body->oa, 0);
931 client_obd_list_lock(&cli->cl_loi_list_lock);
932 body->oa.o_grant = cli->cl_avail_grant - target;
933 cli->cl_avail_grant = target;
934 client_obd_list_unlock(&cli->cl_loi_list_lock);
935 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
936 body->oa.o_valid |= OBD_MD_FLFLAGS;
937 body->oa.o_flags = 0;
939 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
940 osc_update_next_shrink(cli);
942 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
943 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
944 sizeof(*body), body, NULL);
946 __osc_update_grant(cli, body->oa.o_grant);
951 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
952 static int osc_should_shrink_grant(struct client_obd *client)
954 cfs_time_t time = cfs_time_current();
955 cfs_time_t next_shrink = client->cl_next_shrink_grant;
957 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
958 OBD_CONNECT_GRANT_SHRINK) == 0)
961 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
962 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
963 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
966 osc_update_next_shrink(client);
971 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
973 struct client_obd *client;
975 cfs_list_for_each_entry(client, &item->ti_obd_list,
976 cl_grant_shrink_list) {
977 if (osc_should_shrink_grant(client))
978 osc_shrink_grant(client);
983 static int osc_add_shrink_grant(struct client_obd *client)
987 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
989 osc_grant_shrink_grant_cb, NULL,
990 &client->cl_grant_shrink_list);
992 CERROR("add grant client %s error %d\n",
993 client->cl_import->imp_obd->obd_name, rc);
996 CDEBUG(D_CACHE, "add grant client %s \n",
997 client->cl_import->imp_obd->obd_name);
998 osc_update_next_shrink(client);
1002 static int osc_del_shrink_grant(struct client_obd *client)
1004 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1008 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1011 * ocd_grant is the total grant amount we're expect to hold: if we've
1012 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1013 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1015 * race is tolerable here: if we're evicted, but imp_state already
1016 * left EVICTED state, then cl_dirty must be 0 already.
1018 client_obd_list_lock(&cli->cl_loi_list_lock);
1019 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1020 cli->cl_avail_grant = ocd->ocd_grant;
1022 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1024 if (cli->cl_avail_grant < 0) {
1025 CWARN("%s: available grant < 0, the OSS is probably not running"
1026 " with patch from bug20278 (%ld) \n",
1027 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1028 /* workaround for 1.6 servers which do not have
1029 * the patch from bug20278 */
1030 cli->cl_avail_grant = ocd->ocd_grant;
1033 client_obd_list_unlock(&cli->cl_loi_list_lock);
1035 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1036 cli->cl_import->imp_obd->obd_name,
1037 cli->cl_avail_grant, cli->cl_lost_grant);
1039 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1040 cfs_list_empty(&cli->cl_grant_shrink_list))
1041 osc_add_shrink_grant(cli);
1044 /* We assume that the reason this OSC got a short read is because it read
1045 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1046 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1047 * this stripe never got written at or beyond this stripe offset yet. */
1048 static void handle_short_read(int nob_read, obd_count page_count,
1049 struct brw_page **pga)
1054 /* skip bytes read OK */
1055 while (nob_read > 0) {
1056 LASSERT (page_count > 0);
1058 if (pga[i]->count > nob_read) {
1059 /* EOF inside this page */
1060 ptr = cfs_kmap(pga[i]->pg) +
1061 (pga[i]->off & ~CFS_PAGE_MASK);
1062 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1063 cfs_kunmap(pga[i]->pg);
1069 nob_read -= pga[i]->count;
1074 /* zero remaining pages */
1075 while (page_count-- > 0) {
1076 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1077 memset(ptr, 0, pga[i]->count);
1078 cfs_kunmap(pga[i]->pg);
1083 static int check_write_rcs(struct ptlrpc_request *req,
1084 int requested_nob, int niocount,
1085 obd_count page_count, struct brw_page **pga)
1090 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1091 sizeof(*remote_rcs) *
1093 if (remote_rcs == NULL) {
1094 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1098 /* return error if any niobuf was in error */
1099 for (i = 0; i < niocount; i++) {
1100 if ((int)remote_rcs[i] < 0)
1101 return(remote_rcs[i]);
1103 if (remote_rcs[i] != 0) {
1104 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1105 i, remote_rcs[i], req);
1110 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1111 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1112 req->rq_bulk->bd_nob_transferred, requested_nob);
1119 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1121 if (p1->flag != p2->flag) {
1122 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1123 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1125 /* warn if we try to combine flags that we don't know to be
1126 * safe to combine */
1127 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1128 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1129 "report this at http://bugs.whamcloud.com/\n",
1130 p1->flag, p2->flag);
1135 return (p1->off + p1->count == p2->off);
1138 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1139 struct brw_page **pga, int opc,
1140 cksum_type_t cksum_type)
1145 LASSERT (pg_count > 0);
1146 cksum = init_checksum(cksum_type);
1147 while (nob > 0 && pg_count > 0) {
1148 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1149 int off = pga[i]->off & ~CFS_PAGE_MASK;
1150 int count = pga[i]->count > nob ? nob : pga[i]->count;
1152 /* corrupt the data before we compute the checksum, to
1153 * simulate an OST->client data error */
1154 if (i == 0 && opc == OST_READ &&
1155 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1156 memcpy(ptr + off, "bad1", min(4, nob));
1157 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1158 cfs_kunmap(pga[i]->pg);
1159 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1162 nob -= pga[i]->count;
1166 /* For sending we only compute the wrong checksum instead
1167 * of corrupting the data so it is still correct on a redo */
1168 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1171 return fini_checksum(cksum, cksum_type);
1174 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1175 struct lov_stripe_md *lsm, obd_count page_count,
1176 struct brw_page **pga,
1177 struct ptlrpc_request **reqp,
1178 struct obd_capa *ocapa, int reserve,
1181 struct ptlrpc_request *req;
1182 struct ptlrpc_bulk_desc *desc;
1183 struct ost_body *body;
1184 struct obd_ioobj *ioobj;
1185 struct niobuf_remote *niobuf;
1186 int niocount, i, requested_nob, opc, rc;
1187 struct osc_brw_async_args *aa;
1188 struct req_capsule *pill;
1189 struct brw_page *pg_prev;
1192 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1193 RETURN(-ENOMEM); /* Recoverable */
1194 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1195 RETURN(-EINVAL); /* Fatal */
1197 if ((cmd & OBD_BRW_WRITE) != 0) {
1199 req = ptlrpc_request_alloc_pool(cli->cl_import,
1200 cli->cl_import->imp_rq_pool,
1201 &RQF_OST_BRW_WRITE);
1204 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1209 for (niocount = i = 1; i < page_count; i++) {
1210 if (!can_merge_pages(pga[i - 1], pga[i]))
1214 pill = &req->rq_pill;
1215 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1217 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1218 niocount * sizeof(*niobuf));
1219 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1221 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1223 ptlrpc_request_free(req);
1226 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1227 ptlrpc_at_set_req_timeout(req);
1229 if (opc == OST_WRITE)
1230 desc = ptlrpc_prep_bulk_imp(req, page_count,
1231 BULK_GET_SOURCE, OST_BULK_PORTAL);
1233 desc = ptlrpc_prep_bulk_imp(req, page_count,
1234 BULK_PUT_SINK, OST_BULK_PORTAL);
1237 GOTO(out, rc = -ENOMEM);
1238 /* NB request now owns desc and will free it when it gets freed */
1240 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1241 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1242 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1243 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1245 lustre_set_wire_obdo(&body->oa, oa);
1247 obdo_to_ioobj(oa, ioobj);
1248 ioobj->ioo_bufcnt = niocount;
1249 osc_pack_capa(req, body, ocapa);
1250 LASSERT (page_count > 0);
1252 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1253 struct brw_page *pg = pga[i];
1254 int poff = pg->off & ~CFS_PAGE_MASK;
1256 LASSERT(pg->count > 0);
1257 /* make sure there is no gap in the middle of page array */
1258 LASSERTF(page_count == 1 ||
1259 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1260 ergo(i > 0 && i < page_count - 1,
1261 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1262 ergo(i == page_count - 1, poff == 0)),
1263 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1264 i, page_count, pg, pg->off, pg->count);
1266 LASSERTF(i == 0 || pg->off > pg_prev->off,
1267 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1268 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1270 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1271 pg_prev->pg, page_private(pg_prev->pg),
1272 pg_prev->pg->index, pg_prev->off);
1274 LASSERTF(i == 0 || pg->off > pg_prev->off,
1275 "i %d p_c %u\n", i, page_count);
1277 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1278 (pg->flag & OBD_BRW_SRVLOCK));
1280 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1281 requested_nob += pg->count;
1283 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1285 niobuf->len += pg->count;
1287 niobuf->offset = pg->off;
1288 niobuf->len = pg->count;
1289 niobuf->flags = pg->flag;
1294 LASSERTF((void *)(niobuf - niocount) ==
1295 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1296 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1297 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1299 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1301 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1302 body->oa.o_valid |= OBD_MD_FLFLAGS;
1303 body->oa.o_flags = 0;
1305 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1308 if (osc_should_shrink_grant(cli))
1309 osc_shrink_grant_local(cli, &body->oa);
1311 /* size[REQ_REC_OFF] still sizeof (*body) */
1312 if (opc == OST_WRITE) {
1313 if (cli->cl_checksum &&
1314 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1315 /* store cl_cksum_type in a local variable since
1316 * it can be changed via lprocfs */
1317 cksum_type_t cksum_type = cli->cl_cksum_type;
1319 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1320 oa->o_flags &= OBD_FL_LOCAL_MASK;
1321 body->oa.o_flags = 0;
1323 body->oa.o_flags |= cksum_type_pack(cksum_type);
1324 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1325 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1329 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1331 /* save this in 'oa', too, for later checking */
1332 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1333 oa->o_flags |= cksum_type_pack(cksum_type);
1335 /* clear out the checksum flag, in case this is a
1336 * resend but cl_checksum is no longer set. b=11238 */
1337 oa->o_valid &= ~OBD_MD_FLCKSUM;
1339 oa->o_cksum = body->oa.o_cksum;
1340 /* 1 RC per niobuf */
1341 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1342 sizeof(__u32) * niocount);
1344 if (cli->cl_checksum &&
1345 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1346 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1347 body->oa.o_flags = 0;
1348 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1349 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1352 ptlrpc_request_set_replen(req);
1354 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1355 aa = ptlrpc_req_async_args(req);
1357 aa->aa_requested_nob = requested_nob;
1358 aa->aa_nio_count = niocount;
1359 aa->aa_page_count = page_count;
1363 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1364 if (ocapa && reserve)
1365 aa->aa_ocapa = capa_get(ocapa);
1371 ptlrpc_req_finished(req);
1375 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1376 __u32 client_cksum, __u32 server_cksum, int nob,
1377 obd_count page_count, struct brw_page **pga,
1378 cksum_type_t client_cksum_type)
1382 cksum_type_t cksum_type;
1384 if (server_cksum == client_cksum) {
1385 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1389 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1391 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1394 if (cksum_type != client_cksum_type)
1395 msg = "the server did not use the checksum type specified in "
1396 "the original request - likely a protocol problem";
1397 else if (new_cksum == server_cksum)
1398 msg = "changed on the client after we checksummed it - "
1399 "likely false positive due to mmap IO (bug 11742)";
1400 else if (new_cksum == client_cksum)
1401 msg = "changed in transit before arrival at OST";
1403 msg = "changed in transit AND doesn't match the original - "
1404 "likely false positive due to mmap IO (bug 11742)";
1406 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1407 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1408 msg, libcfs_nid2str(peer->nid),
1409 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1410 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1411 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1413 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1415 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1416 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1417 "client csum now %x\n", client_cksum, client_cksum_type,
1418 server_cksum, cksum_type, new_cksum);
1422 /* Note rc enters this function as number of bytes transferred */
1423 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1425 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1426 const lnet_process_id_t *peer =
1427 &req->rq_import->imp_connection->c_peer;
1428 struct client_obd *cli = aa->aa_cli;
1429 struct ost_body *body;
1430 __u32 client_cksum = 0;
1433 if (rc < 0 && rc != -EDQUOT) {
1434 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1438 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1439 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1441 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1445 /* set/clear over quota flag for a uid/gid */
1446 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1447 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1448 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1450 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1451 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1453 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1456 osc_update_grant(cli, body);
1461 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1462 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1464 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1466 CERROR("Unexpected +ve rc %d\n", rc);
1469 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1471 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1474 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1475 check_write_checksum(&body->oa, peer, client_cksum,
1476 body->oa.o_cksum, aa->aa_requested_nob,
1477 aa->aa_page_count, aa->aa_ppga,
1478 cksum_type_unpack(aa->aa_oa->o_flags)))
1481 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1482 aa->aa_page_count, aa->aa_ppga);
1486 /* The rest of this function executes only for OST_READs */
1488 /* if unwrap_bulk failed, return -EAGAIN to retry */
1489 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1491 GOTO(out, rc = -EAGAIN);
1493 if (rc > aa->aa_requested_nob) {
1494 CERROR("Unexpected rc %d (%d requested)\n", rc,
1495 aa->aa_requested_nob);
1499 if (rc != req->rq_bulk->bd_nob_transferred) {
1500 CERROR ("Unexpected rc %d (%d transferred)\n",
1501 rc, req->rq_bulk->bd_nob_transferred);
1505 if (rc < aa->aa_requested_nob)
1506 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1508 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1509 static int cksum_counter;
1510 __u32 server_cksum = body->oa.o_cksum;
1513 cksum_type_t cksum_type;
1515 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1516 body->oa.o_flags : 0);
1517 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1518 aa->aa_ppga, OST_READ,
1521 if (peer->nid == req->rq_bulk->bd_sender) {
1525 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1528 if (server_cksum == ~0 && rc > 0) {
1529 CERROR("Protocol error: server %s set the 'checksum' "
1530 "bit, but didn't send a checksum. Not fatal, "
1531 "but please notify on http://bugs.whamcloud.com/\n",
1532 libcfs_nid2str(peer->nid));
1533 } else if (server_cksum != client_cksum) {
1534 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1535 "%s%s%s inode "DFID" object "
1536 LPU64"/"LPU64" extent "
1537 "["LPU64"-"LPU64"]\n",
1538 req->rq_import->imp_obd->obd_name,
1539 libcfs_nid2str(peer->nid),
1541 body->oa.o_valid & OBD_MD_FLFID ?
1542 body->oa.o_parent_seq : (__u64)0,
1543 body->oa.o_valid & OBD_MD_FLFID ?
1544 body->oa.o_parent_oid : 0,
1545 body->oa.o_valid & OBD_MD_FLFID ?
1546 body->oa.o_parent_ver : 0,
1548 body->oa.o_valid & OBD_MD_FLGROUP ?
1549 body->oa.o_seq : (__u64)0,
1550 aa->aa_ppga[0]->off,
1551 aa->aa_ppga[aa->aa_page_count-1]->off +
1552 aa->aa_ppga[aa->aa_page_count-1]->count -
1554 CERROR("client %x, server %x, cksum_type %x\n",
1555 client_cksum, server_cksum, cksum_type);
1557 aa->aa_oa->o_cksum = client_cksum;
1561 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1564 } else if (unlikely(client_cksum)) {
1565 static int cksum_missed;
1568 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1569 CERROR("Checksum %u requested from %s but not sent\n",
1570 cksum_missed, libcfs_nid2str(peer->nid));
1576 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1581 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1582 struct lov_stripe_md *lsm,
1583 obd_count page_count, struct brw_page **pga,
1584 struct obd_capa *ocapa)
1586 struct ptlrpc_request *req;
1589 int generation, resends = 0;
1590 struct l_wait_info lwi;
1594 cfs_waitq_init(&waitq);
1595 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1598 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1599 page_count, pga, &req, ocapa, 0, resends);
1604 req->rq_generation_set = 1;
1605 req->rq_import_generation = generation;
1606 req->rq_sent = cfs_time_current_sec() + resends;
1609 rc = ptlrpc_queue_wait(req);
1611 if (rc == -ETIMEDOUT && req->rq_resend) {
1612 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1613 ptlrpc_req_finished(req);
1617 rc = osc_brw_fini_request(req, rc);
1619 ptlrpc_req_finished(req);
1620 /* When server return -EINPROGRESS, client should always retry
1621 * regardless of the number of times the bulk was resent already.*/
1622 if (osc_recoverable_error(rc)) {
1624 if (rc != -EINPROGRESS &&
1625 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1626 CERROR("%s: too many resend retries for object: "
1627 ""LPU64":"LPU64", rc = %d.\n",
1628 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1632 exp->exp_obd->u.cli.cl_import->imp_generation) {
1633 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1634 ""LPU64":"LPU64", rc = %d.\n",
1635 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1639 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1641 l_wait_event(waitq, 0, &lwi);
1646 if (rc == -EAGAIN || rc == -EINPROGRESS)
1651 int osc_brw_redo_request(struct ptlrpc_request *request,
1652 struct osc_brw_async_args *aa)
1654 struct ptlrpc_request *new_req;
1655 struct ptlrpc_request_set *set = request->rq_set;
1656 struct osc_brw_async_args *new_aa;
1657 struct osc_async_page *oap;
1661 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1663 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1664 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1665 aa->aa_cli, aa->aa_oa,
1666 NULL /* lsm unused by osc currently */,
1667 aa->aa_page_count, aa->aa_ppga,
1668 &new_req, aa->aa_ocapa, 0, 1);
1672 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1674 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1675 if (oap->oap_request != NULL) {
1676 LASSERTF(request == oap->oap_request,
1677 "request %p != oap_request %p\n",
1678 request, oap->oap_request);
1679 if (oap->oap_interrupted) {
1680 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1681 ptlrpc_req_finished(new_req);
1686 /* New request takes over pga and oaps from old request.
1687 * Note that copying a list_head doesn't work, need to move it... */
1689 new_req->rq_interpret_reply = request->rq_interpret_reply;
1690 new_req->rq_async_args = request->rq_async_args;
1691 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1692 new_req->rq_generation_set = 1;
1693 new_req->rq_import_generation = request->rq_import_generation;
1695 new_aa = ptlrpc_req_async_args(new_req);
1697 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1698 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1699 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1701 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1702 if (oap->oap_request) {
1703 ptlrpc_req_finished(oap->oap_request);
1704 oap->oap_request = ptlrpc_request_addref(new_req);
1708 new_aa->aa_ocapa = aa->aa_ocapa;
1709 aa->aa_ocapa = NULL;
1711 /* use ptlrpc_set_add_req is safe because interpret functions work
1712 * in check_set context. only one way exist with access to request
1713 * from different thread got -EINTR - this way protected with
1714 * cl_loi_list_lock */
1715 ptlrpc_set_add_req(set, new_req);
1717 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1719 DEBUG_REQ(D_INFO, new_req, "new request");
1724 * ugh, we want disk allocation on the target to happen in offset order. we'll
1725 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1726 * fine for our small page arrays and doesn't require allocation. its an
1727 * insertion sort that swaps elements that are strides apart, shrinking the
1728 * stride down until its '1' and the array is sorted.
1730 static void sort_brw_pages(struct brw_page **array, int num)
1733 struct brw_page *tmp;
1737 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1742 for (i = stride ; i < num ; i++) {
1745 while (j >= stride && array[j - stride]->off > tmp->off) {
1746 array[j] = array[j - stride];
1751 } while (stride > 1);
1754 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1760 LASSERT (pages > 0);
1761 offset = pg[i]->off & ~CFS_PAGE_MASK;
1765 if (pages == 0) /* that's all */
1768 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1769 return count; /* doesn't end on page boundary */
1772 offset = pg[i]->off & ~CFS_PAGE_MASK;
1773 if (offset != 0) /* doesn't start on page boundary */
1780 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1782 struct brw_page **ppga;
1785 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1789 for (i = 0; i < count; i++)
1794 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1796 LASSERT(ppga != NULL);
1797 OBD_FREE(ppga, sizeof(*ppga) * count);
1800 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1801 obd_count page_count, struct brw_page *pga,
1802 struct obd_trans_info *oti)
1804 struct obdo *saved_oa = NULL;
1805 struct brw_page **ppga, **orig;
1806 struct obd_import *imp = class_exp2cliimp(exp);
1807 struct client_obd *cli;
1808 int rc, page_count_orig;
1811 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1812 cli = &imp->imp_obd->u.cli;
1814 if (cmd & OBD_BRW_CHECK) {
1815 /* The caller just wants to know if there's a chance that this
1816 * I/O can succeed */
1818 if (imp->imp_invalid)
1823 /* test_brw with a failed create can trip this, maybe others. */
1824 LASSERT(cli->cl_max_pages_per_rpc);
1828 orig = ppga = osc_build_ppga(pga, page_count);
1831 page_count_orig = page_count;
1833 sort_brw_pages(ppga, page_count);
1834 while (page_count) {
1835 obd_count pages_per_brw;
1837 if (page_count > cli->cl_max_pages_per_rpc)
1838 pages_per_brw = cli->cl_max_pages_per_rpc;
1840 pages_per_brw = page_count;
1842 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1844 if (saved_oa != NULL) {
1845 /* restore previously saved oa */
1846 *oinfo->oi_oa = *saved_oa;
1847 } else if (page_count > pages_per_brw) {
1848 /* save a copy of oa (brw will clobber it) */
1849 OBDO_ALLOC(saved_oa);
1850 if (saved_oa == NULL)
1851 GOTO(out, rc = -ENOMEM);
1852 *saved_oa = *oinfo->oi_oa;
1855 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1856 pages_per_brw, ppga, oinfo->oi_capa);
1861 page_count -= pages_per_brw;
1862 ppga += pages_per_brw;
1866 osc_release_ppga(orig, page_count_orig);
1868 if (saved_oa != NULL)
1869 OBDO_FREE(saved_oa);
1874 static int brw_interpret(const struct lu_env *env,
1875 struct ptlrpc_request *req, void *data, int rc)
1877 struct osc_brw_async_args *aa = data;
1878 struct osc_async_page *oap, *tmp;
1879 struct client_obd *cli;
1882 rc = osc_brw_fini_request(req, rc);
1883 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1884 /* When server return -EINPROGRESS, client should always retry
1885 * regardless of the number of times the bulk was resent already. */
1886 if (osc_recoverable_error(rc)) {
1887 if (req->rq_import_generation !=
1888 req->rq_import->imp_generation) {
1889 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1890 ""LPU64":"LPU64", rc = %d.\n",
1891 req->rq_import->imp_obd->obd_name,
1892 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1893 } else if (rc == -EINPROGRESS ||
1894 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1895 rc = osc_brw_redo_request(req, aa);
1897 CERROR("%s: too many resent retries for object: "
1898 ""LPU64":"LPU64", rc = %d.\n",
1899 req->rq_import->imp_obd->obd_name,
1900 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1905 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1910 capa_put(aa->aa_ocapa);
1911 aa->aa_ocapa = NULL;
1915 client_obd_list_lock(&cli->cl_loi_list_lock);
1917 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1918 * is called so we know whether to go to sync BRWs or wait for more
1919 * RPCs to complete */
1920 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1921 cli->cl_w_in_flight--;
1923 cli->cl_r_in_flight--;
1925 /* the caller may re-use the oap after the completion call so
1926 * we need to clean it up a little */
1927 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
1929 cfs_list_del_init(&oap->oap_rpc_item);
1930 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1932 OBDO_FREE(aa->aa_oa);
1934 osc_wake_cache_waiters(cli);
1935 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1936 client_obd_list_unlock(&cli->cl_loi_list_lock);
1938 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1939 req->rq_bulk->bd_nob_transferred);
1940 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1941 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1946 /* The most tricky part of this function is that it will return with
1947 * cli->cli_loi_list_lock held.
1949 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1950 cfs_list_t *rpc_list, int page_count, int cmd,
1953 struct ptlrpc_request *req = NULL;
1954 struct brw_page **pga = NULL;
1955 struct osc_brw_async_args *aa = NULL;
1956 struct obdo *oa = NULL;
1957 struct osc_async_page *oap;
1958 struct osc_async_page *tmp;
1959 struct cl_req *clerq = NULL;
1960 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1961 struct ldlm_lock *lock = NULL;
1962 struct cl_req_attr crattr;
1963 int i, rc, mpflag = 0;
1966 LASSERT(!cfs_list_empty(rpc_list));
1968 if (cmd & OBD_BRW_MEMALLOC)
1969 mpflag = cfs_memory_pressure_get_and_set();
1971 memset(&crattr, 0, sizeof crattr);
1972 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1974 GOTO(out, rc = -ENOMEM);
1978 GOTO(out, rc = -ENOMEM);
1981 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1982 struct cl_page *page = osc_oap2cl_page(oap);
1983 if (clerq == NULL) {
1984 clerq = cl_req_alloc(env, page, crt,
1985 1 /* only 1-object rpcs for
1988 GOTO(out, rc = PTR_ERR(clerq));
1989 lock = oap->oap_ldlm_lock;
1991 pga[i] = &oap->oap_brw_page;
1992 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1993 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1994 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1996 cl_req_page_add(env, clerq, page);
1999 /* always get the data for the obdo for the rpc */
2000 LASSERT(clerq != NULL);
2002 crattr.cra_capa = NULL;
2003 memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2004 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2006 oa->o_handle = lock->l_remote_handle;
2007 oa->o_valid |= OBD_MD_FLHANDLE;
2010 rc = cl_req_prep(env, clerq);
2012 CERROR("cl_req_prep failed: %d\n", rc);
2016 sort_brw_pages(pga, page_count);
2017 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2018 pga, &req, crattr.cra_capa, 1, 0);
2020 CERROR("prep_req failed: %d\n", rc);
2024 req->rq_interpret_reply = brw_interpret;
2025 if (cmd & OBD_BRW_MEMALLOC)
2026 req->rq_memalloc = 1;
2028 /* Need to update the timestamps after the request is built in case
2029 * we race with setattr (locally or in queue at OST). If OST gets
2030 * later setattr before earlier BRW (as determined by the request xid),
2031 * the OST will not use BRW timestamps. Sadly, there is no obvious
2032 * way to do this in a single call. bug 10150 */
2033 cl_req_attr_set(env, clerq, &crattr,
2034 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2036 lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2038 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2039 aa = ptlrpc_req_async_args(req);
2040 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2041 cfs_list_splice(rpc_list, &aa->aa_oaps);
2042 CFS_INIT_LIST_HEAD(rpc_list);
2043 aa->aa_clerq = clerq;
2045 if (cmd & OBD_BRW_MEMALLOC)
2046 cfs_memory_pressure_restore(mpflag);
2048 capa_put(crattr.cra_capa);
2050 LASSERT(req == NULL);
2055 OBD_FREE(pga, sizeof(*pga) * page_count);
2056 /* this should happen rarely and is pretty bad, it makes the
2057 * pending list not follow the dirty order */
2058 client_obd_list_lock(&cli->cl_loi_list_lock);
2059 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2060 cfs_list_del_init(&oap->oap_rpc_item);
2062 /* queued sync pages can be torn down while the pages
2063 * were between the pending list and the rpc */
2064 if (oap->oap_interrupted) {
2065 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2066 osc_ap_completion(env, cli, NULL, oap, 0,
2070 osc_ap_completion(env, cli, NULL, oap, 0, rc);
2072 if (clerq && !IS_ERR(clerq))
2073 cl_req_completion(env, clerq, rc);
2075 struct osc_async_page *tmp = NULL;
2077 /* queued sync pages can be torn down while the pages
2078 * were between the pending list and the rpc */
2079 LASSERT(aa != NULL);
2080 client_obd_list_lock(&cli->cl_loi_list_lock);
2081 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2082 /* only one oap gets a request reference */
2085 if (oap->oap_interrupted && !req->rq_intr) {
2086 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2088 ptlrpc_mark_interrupted(req);
2092 tmp->oap_request = ptlrpc_request_addref(req);
2094 DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
2095 page_count, aa, cli->cl_r_in_flight,
2096 cli->cl_w_in_flight);
2098 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2099 * see which CPU/NUMA node the majority of pages were allocated
2100 * on, and try to assign the async RPC to the CPU core
2101 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2103 * But on the other hand, we expect that multiple ptlrpcd
2104 * threads and the initial write sponsor can run in parallel,
2105 * especially when data checksum is enabled, which is CPU-bound
2106 * operation and single ptlrpcd thread cannot process in time.
2107 * So more ptlrpcd threads sharing BRW load
2108 * (with PDL_POLICY_ROUND) seems better.
2110 ptlrpcd_add_req(req, pol, -1);
2115 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2116 struct ldlm_enqueue_info *einfo)
2118 void *data = einfo->ei_cbdata;
2121 LASSERT(lock != NULL);
2122 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2123 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2124 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2125 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2127 lock_res_and_lock(lock);
2128 cfs_spin_lock(&osc_ast_guard);
2130 if (lock->l_ast_data == NULL)
2131 lock->l_ast_data = data;
2132 if (lock->l_ast_data == data)
2135 cfs_spin_unlock(&osc_ast_guard);
2136 unlock_res_and_lock(lock);
2141 static int osc_set_data_with_check(struct lustre_handle *lockh,
2142 struct ldlm_enqueue_info *einfo)
2144 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2148 set = osc_set_lock_data_with_check(lock, einfo);
2149 LDLM_LOCK_PUT(lock);
2151 CERROR("lockh %p, data %p - client evicted?\n",
2152 lockh, einfo->ei_cbdata);
2156 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2157 ldlm_iterator_t replace, void *data)
2159 struct ldlm_res_id res_id;
2160 struct obd_device *obd = class_exp2obd(exp);
2162 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2163 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2167 /* find any ldlm lock of the inode in osc
2171 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2172 ldlm_iterator_t replace, void *data)
2174 struct ldlm_res_id res_id;
2175 struct obd_device *obd = class_exp2obd(exp);
2178 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2179 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2180 if (rc == LDLM_ITER_STOP)
2182 if (rc == LDLM_ITER_CONTINUE)
2187 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2188 obd_enqueue_update_f upcall, void *cookie,
2189 int *flags, int agl, int rc)
2191 int intent = *flags & LDLM_FL_HAS_INTENT;
2195 /* The request was created before ldlm_cli_enqueue call. */
2196 if (rc == ELDLM_LOCK_ABORTED) {
2197 struct ldlm_reply *rep;
2198 rep = req_capsule_server_get(&req->rq_pill,
2201 LASSERT(rep != NULL);
2202 if (rep->lock_policy_res1)
2203 rc = rep->lock_policy_res1;
2207 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2209 *flags |= LDLM_FL_LVB_READY;
2210 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2211 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2214 /* Call the update callback. */
2215 rc = (*upcall)(cookie, rc);
2219 static int osc_enqueue_interpret(const struct lu_env *env,
2220 struct ptlrpc_request *req,
2221 struct osc_enqueue_args *aa, int rc)
2223 struct ldlm_lock *lock;
2224 struct lustre_handle handle;
2226 struct ost_lvb *lvb;
2228 int *flags = aa->oa_flags;
2230 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2231 * might be freed anytime after lock upcall has been called. */
2232 lustre_handle_copy(&handle, aa->oa_lockh);
2233 mode = aa->oa_ei->ei_mode;
2235 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2237 lock = ldlm_handle2lock(&handle);
2239 /* Take an additional reference so that a blocking AST that
2240 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2241 * to arrive after an upcall has been executed by
2242 * osc_enqueue_fini(). */
2243 ldlm_lock_addref(&handle, mode);
2245 /* Let CP AST to grant the lock first. */
2246 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2248 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2253 lvb_len = sizeof(*aa->oa_lvb);
2256 /* Complete obtaining the lock procedure. */
2257 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2258 mode, flags, lvb, lvb_len, &handle, rc);
2259 /* Complete osc stuff. */
2260 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2261 flags, aa->oa_agl, rc);
2263 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2265 /* Release the lock for async request. */
2266 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2268 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2269 * not already released by
2270 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2272 ldlm_lock_decref(&handle, mode);
2274 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2275 aa->oa_lockh, req, aa);
2276 ldlm_lock_decref(&handle, mode);
2277 LDLM_LOCK_PUT(lock);
2281 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2282 struct lov_oinfo *loi, int flags,
2283 struct ost_lvb *lvb, __u32 mode, int rc)
2285 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2287 if (rc == ELDLM_OK) {
2290 LASSERT(lock != NULL);
2291 loi->loi_lvb = *lvb;
2292 tmp = loi->loi_lvb.lvb_size;
2293 /* Extend KMS up to the end of this lock and no further
2294 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2295 if (tmp > lock->l_policy_data.l_extent.end)
2296 tmp = lock->l_policy_data.l_extent.end + 1;
2297 if (tmp >= loi->loi_kms) {
2298 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2299 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2300 loi_kms_set(loi, tmp);
2302 LDLM_DEBUG(lock, "lock acquired, setting rss="
2303 LPU64"; leaving kms="LPU64", end="LPU64,
2304 loi->loi_lvb.lvb_size, loi->loi_kms,
2305 lock->l_policy_data.l_extent.end);
2307 ldlm_lock_allow_match(lock);
2308 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2309 LASSERT(lock != NULL);
2310 loi->loi_lvb = *lvb;
2311 ldlm_lock_allow_match(lock);
2312 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2313 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2319 ldlm_lock_fail_match(lock);
2321 LDLM_LOCK_PUT(lock);
2324 EXPORT_SYMBOL(osc_update_enqueue);
2326 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2328 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2329 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2330 * other synchronous requests, however keeping some locks and trying to obtain
2331 * others may take a considerable amount of time in a case of ost failure; and
2332 * when other sync requests do not get released lock from a client, the client
2333 * is excluded from the cluster -- such scenarious make the life difficult, so
2334 * release locks just after they are obtained. */
2335 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2336 int *flags, ldlm_policy_data_t *policy,
2337 struct ost_lvb *lvb, int kms_valid,
2338 obd_enqueue_update_f upcall, void *cookie,
2339 struct ldlm_enqueue_info *einfo,
2340 struct lustre_handle *lockh,
2341 struct ptlrpc_request_set *rqset, int async, int agl)
2343 struct obd_device *obd = exp->exp_obd;
2344 struct ptlrpc_request *req = NULL;
2345 int intent = *flags & LDLM_FL_HAS_INTENT;
2346 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2351 /* Filesystem lock extents are extended to page boundaries so that
2352 * dealing with the page cache is a little smoother. */
2353 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2354 policy->l_extent.end |= ~CFS_PAGE_MASK;
2357 * kms is not valid when either object is completely fresh (so that no
2358 * locks are cached), or object was evicted. In the latter case cached
2359 * lock cannot be used, because it would prime inode state with
2360 * potentially stale LVB.
2365 /* Next, search for already existing extent locks that will cover us */
2366 /* If we're trying to read, we also search for an existing PW lock. The
2367 * VFS and page cache already protect us locally, so lots of readers/
2368 * writers can share a single PW lock.
2370 * There are problems with conversion deadlocks, so instead of
2371 * converting a read lock to a write lock, we'll just enqueue a new
2374 * At some point we should cancel the read lock instead of making them
2375 * send us a blocking callback, but there are problems with canceling
2376 * locks out from other users right now, too. */
2377 mode = einfo->ei_mode;
2378 if (einfo->ei_mode == LCK_PR)
2380 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2381 einfo->ei_type, policy, mode, lockh, 0);
2383 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2385 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2386 /* For AGL, if enqueue RPC is sent but the lock is not
2387 * granted, then skip to process this strpe.
2388 * Return -ECANCELED to tell the caller. */
2389 ldlm_lock_decref(lockh, mode);
2390 LDLM_LOCK_PUT(matched);
2392 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2393 *flags |= LDLM_FL_LVB_READY;
2394 /* addref the lock only if not async requests and PW
2395 * lock is matched whereas we asked for PR. */
2396 if (!rqset && einfo->ei_mode != mode)
2397 ldlm_lock_addref(lockh, LCK_PR);
2399 /* I would like to be able to ASSERT here that
2400 * rss <= kms, but I can't, for reasons which
2401 * are explained in lov_enqueue() */
2404 /* We already have a lock, and it's referenced */
2405 (*upcall)(cookie, ELDLM_OK);
2407 if (einfo->ei_mode != mode)
2408 ldlm_lock_decref(lockh, LCK_PW);
2410 /* For async requests, decref the lock. */
2411 ldlm_lock_decref(lockh, einfo->ei_mode);
2412 LDLM_LOCK_PUT(matched);
2415 ldlm_lock_decref(lockh, mode);
2416 LDLM_LOCK_PUT(matched);
2422 CFS_LIST_HEAD(cancels);
2423 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2424 &RQF_LDLM_ENQUEUE_LVB);
2428 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2430 ptlrpc_request_free(req);
2434 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2436 ptlrpc_request_set_replen(req);
2439 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2440 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2442 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2443 sizeof(*lvb), lockh, async);
2446 struct osc_enqueue_args *aa;
2447 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2448 aa = ptlrpc_req_async_args(req);
2451 aa->oa_flags = flags;
2452 aa->oa_upcall = upcall;
2453 aa->oa_cookie = cookie;
2455 aa->oa_lockh = lockh;
2458 req->rq_interpret_reply =
2459 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2460 if (rqset == PTLRPCD_SET)
2461 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2463 ptlrpc_set_add_req(rqset, req);
2464 } else if (intent) {
2465 ptlrpc_req_finished(req);
2470 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2472 ptlrpc_req_finished(req);
2477 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2478 struct ldlm_enqueue_info *einfo,
2479 struct ptlrpc_request_set *rqset)
2481 struct ldlm_res_id res_id;
2485 osc_build_res_name(oinfo->oi_md->lsm_object_id,
2486 oinfo->oi_md->lsm_object_seq, &res_id);
2488 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2489 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2490 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2491 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2492 rqset, rqset != NULL, 0);
2496 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2497 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2498 int *flags, void *data, struct lustre_handle *lockh,
2501 struct obd_device *obd = exp->exp_obd;
2502 int lflags = *flags;
2506 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2509 /* Filesystem lock extents are extended to page boundaries so that
2510 * dealing with the page cache is a little smoother */
2511 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2512 policy->l_extent.end |= ~CFS_PAGE_MASK;
2514 /* Next, search for already existing extent locks that will cover us */
2515 /* If we're trying to read, we also search for an existing PW lock. The
2516 * VFS and page cache already protect us locally, so lots of readers/
2517 * writers can share a single PW lock. */
2521 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2522 res_id, type, policy, rc, lockh, unref);
2525 if (!osc_set_data_with_check(lockh, data)) {
2526 if (!(lflags & LDLM_FL_TEST_LOCK))
2527 ldlm_lock_decref(lockh, rc);
2531 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2532 ldlm_lock_addref(lockh, LCK_PR);
2533 ldlm_lock_decref(lockh, LCK_PW);
2540 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2544 if (unlikely(mode == LCK_GROUP))
2545 ldlm_lock_decref_and_cancel(lockh, mode);
2547 ldlm_lock_decref(lockh, mode);
2552 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2553 __u32 mode, struct lustre_handle *lockh)
2556 RETURN(osc_cancel_base(lockh, mode));
2559 static int osc_cancel_unused(struct obd_export *exp,
2560 struct lov_stripe_md *lsm,
2561 ldlm_cancel_flags_t flags,
2564 struct obd_device *obd = class_exp2obd(exp);
2565 struct ldlm_res_id res_id, *resp = NULL;
2568 resp = osc_build_res_name(lsm->lsm_object_id,
2569 lsm->lsm_object_seq, &res_id);
2572 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2575 static int osc_statfs_interpret(const struct lu_env *env,
2576 struct ptlrpc_request *req,
2577 struct osc_async_args *aa, int rc)
2579 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2580 struct obd_statfs *msfs;
2585 /* The request has in fact never been sent
2586 * due to issues at a higher level (LOV).
2587 * Exit immediately since the caller is
2588 * aware of the problem and takes care
2589 * of the clean up */
2592 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2593 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2599 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2601 GOTO(out, rc = -EPROTO);
2604 /* Reinitialize the RDONLY and DEGRADED flags at the client
2605 * on each statfs, so they don't stay set permanently. */
2606 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2608 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2609 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2610 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2611 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2613 if (unlikely(msfs->os_state & OS_STATE_READONLY))
2614 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2615 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2616 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2618 /* Add a bit of hysteresis so this flag isn't continually flapping,
2619 * and ensure that new files don't get extremely fragmented due to
2620 * only a small amount of available space in the filesystem.
2621 * We want to set the NOSPC flag when there is less than ~0.1% free
2622 * and clear it when there is at least ~0.2% free space, so:
2623 * avail < ~0.1% max max = avail + used
2624 * 1025 * avail < avail + used used = blocks - free
2625 * 1024 * avail < used
2626 * 1024 * avail < blocks - free
2627 * avail < ((blocks - free) >> 10)
2629 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2630 * lose that amount of space so in those cases we report no space left
2631 * if their is less than 1 GB left. */
2632 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2633 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2634 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2635 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2636 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2637 (msfs->os_ffree > 64) &&
2638 (msfs->os_bavail > (used << 1)))) {
2639 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2640 OSCC_FLAG_NOSPC_BLK);
2643 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2644 (msfs->os_bavail < used)))
2645 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2647 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2649 *aa->aa_oi->oi_osfs = *msfs;
2651 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2655 static int osc_statfs_async(struct obd_export *exp,
2656 struct obd_info *oinfo, __u64 max_age,
2657 struct ptlrpc_request_set *rqset)
2659 struct obd_device *obd = class_exp2obd(exp);
2660 struct ptlrpc_request *req;
2661 struct osc_async_args *aa;
2665 /* We could possibly pass max_age in the request (as an absolute
2666 * timestamp or a "seconds.usec ago") so the target can avoid doing
2667 * extra calls into the filesystem if that isn't necessary (e.g.
2668 * during mount that would help a bit). Having relative timestamps
2669 * is not so great if request processing is slow, while absolute
2670 * timestamps are not ideal because they need time synchronization. */
2671 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2675 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2677 ptlrpc_request_free(req);
2680 ptlrpc_request_set_replen(req);
2681 req->rq_request_portal = OST_CREATE_PORTAL;
2682 ptlrpc_at_set_req_timeout(req);
2684 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2685 /* procfs requests not want stat in wait for avoid deadlock */
2686 req->rq_no_resend = 1;
2687 req->rq_no_delay = 1;
2690 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2691 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2692 aa = ptlrpc_req_async_args(req);
2695 ptlrpc_set_add_req(rqset, req);
2699 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2700 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2702 struct obd_device *obd = class_exp2obd(exp);
2703 struct obd_statfs *msfs;
2704 struct ptlrpc_request *req;
2705 struct obd_import *imp = NULL;
2709 /*Since the request might also come from lprocfs, so we need
2710 *sync this with client_disconnect_export Bug15684*/
2711 cfs_down_read(&obd->u.cli.cl_sem);
2712 if (obd->u.cli.cl_import)
2713 imp = class_import_get(obd->u.cli.cl_import);
2714 cfs_up_read(&obd->u.cli.cl_sem);
2718 /* We could possibly pass max_age in the request (as an absolute
2719 * timestamp or a "seconds.usec ago") so the target can avoid doing
2720 * extra calls into the filesystem if that isn't necessary (e.g.
2721 * during mount that would help a bit). Having relative timestamps
2722 * is not so great if request processing is slow, while absolute
2723 * timestamps are not ideal because they need time synchronization. */
2724 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2726 class_import_put(imp);
2731 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2733 ptlrpc_request_free(req);
2736 ptlrpc_request_set_replen(req);
2737 req->rq_request_portal = OST_CREATE_PORTAL;
2738 ptlrpc_at_set_req_timeout(req);
2740 if (flags & OBD_STATFS_NODELAY) {
2741 /* procfs requests not want stat in wait for avoid deadlock */
2742 req->rq_no_resend = 1;
2743 req->rq_no_delay = 1;
2746 rc = ptlrpc_queue_wait(req);
2750 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2752 GOTO(out, rc = -EPROTO);
2759 ptlrpc_req_finished(req);
2763 /* Retrieve object striping information.
2765 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2766 * the maximum number of OST indices which will fit in the user buffer.
2767 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2769 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2771 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2772 struct lov_user_md_v3 lum, *lumk;
2773 struct lov_user_ost_data_v1 *lmm_objects;
2774 int rc = 0, lum_size;
2780 /* we only need the header part from user space to get lmm_magic and
2781 * lmm_stripe_count, (the header part is common to v1 and v3) */
2782 lum_size = sizeof(struct lov_user_md_v1);
2783 if (cfs_copy_from_user(&lum, lump, lum_size))
2786 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2787 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2790 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2791 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2792 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2793 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2795 /* we can use lov_mds_md_size() to compute lum_size
2796 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2797 if (lum.lmm_stripe_count > 0) {
2798 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2799 OBD_ALLOC(lumk, lum_size);
2803 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2804 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2806 lmm_objects = &(lumk->lmm_objects[0]);
2807 lmm_objects->l_object_id = lsm->lsm_object_id;
2809 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2813 lumk->lmm_object_id = lsm->lsm_object_id;
2814 lumk->lmm_object_seq = lsm->lsm_object_seq;
2815 lumk->lmm_stripe_count = 1;
2817 if (cfs_copy_to_user(lump, lumk, lum_size))
2821 OBD_FREE(lumk, lum_size);
2827 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2828 void *karg, void *uarg)
2830 struct obd_device *obd = exp->exp_obd;
2831 struct obd_ioctl_data *data = karg;
2835 if (!cfs_try_module_get(THIS_MODULE)) {
2836 CERROR("Can't get module. Is it alive?");
2840 case OBD_IOC_LOV_GET_CONFIG: {
2842 struct lov_desc *desc;
2843 struct obd_uuid uuid;
2847 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2848 GOTO(out, err = -EINVAL);
2850 data = (struct obd_ioctl_data *)buf;
2852 if (sizeof(*desc) > data->ioc_inllen1) {
2853 obd_ioctl_freedata(buf, len);
2854 GOTO(out, err = -EINVAL);
2857 if (data->ioc_inllen2 < sizeof(uuid)) {
2858 obd_ioctl_freedata(buf, len);
2859 GOTO(out, err = -EINVAL);
2862 desc = (struct lov_desc *)data->ioc_inlbuf1;
2863 desc->ld_tgt_count = 1;
2864 desc->ld_active_tgt_count = 1;
2865 desc->ld_default_stripe_count = 1;
2866 desc->ld_default_stripe_size = 0;
2867 desc->ld_default_stripe_offset = 0;
2868 desc->ld_pattern = 0;
2869 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2871 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2873 err = cfs_copy_to_user((void *)uarg, buf, len);
2876 obd_ioctl_freedata(buf, len);
2879 case LL_IOC_LOV_SETSTRIPE:
2880 err = obd_alloc_memmd(exp, karg);
2884 case LL_IOC_LOV_GETSTRIPE:
2885 err = osc_getstripe(karg, uarg);
2887 case OBD_IOC_CLIENT_RECOVER:
2888 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2889 data->ioc_inlbuf1, 0);
2893 case IOC_OSC_SET_ACTIVE:
2894 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2897 case OBD_IOC_POLL_QUOTACHECK:
2898 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2900 case OBD_IOC_PING_TARGET:
2901 err = ptlrpc_obd_ping(obd);
2904 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2905 cmd, cfs_curproc_comm());
2906 GOTO(out, err = -ENOTTY);
2909 cfs_module_put(THIS_MODULE);
2913 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2914 obd_count keylen, void *key, __u32 *vallen, void *val,
2915 struct lov_stripe_md *lsm)
2918 if (!vallen || !val)
2921 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2922 __u32 *stripe = val;
2923 *vallen = sizeof(*stripe);
2926 } else if (KEY_IS(KEY_LAST_ID)) {
2927 struct ptlrpc_request *req;
2932 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2933 &RQF_OST_GET_INFO_LAST_ID);
2937 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2938 RCL_CLIENT, keylen);
2939 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2941 ptlrpc_request_free(req);
2945 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2946 memcpy(tmp, key, keylen);
2948 req->rq_no_delay = req->rq_no_resend = 1;
2949 ptlrpc_request_set_replen(req);
2950 rc = ptlrpc_queue_wait(req);
2954 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2956 GOTO(out, rc = -EPROTO);
2958 *((obd_id *)val) = *reply;
2960 ptlrpc_req_finished(req);
2962 } else if (KEY_IS(KEY_FIEMAP)) {
2963 struct ptlrpc_request *req;
2964 struct ll_user_fiemap *reply;
2968 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2969 &RQF_OST_GET_INFO_FIEMAP);
2973 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2974 RCL_CLIENT, keylen);
2975 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2976 RCL_CLIENT, *vallen);
2977 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2978 RCL_SERVER, *vallen);
2980 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2982 ptlrpc_request_free(req);
2986 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2987 memcpy(tmp, key, keylen);
2988 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2989 memcpy(tmp, val, *vallen);
2991 ptlrpc_request_set_replen(req);
2992 rc = ptlrpc_queue_wait(req);
2996 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2998 GOTO(out1, rc = -EPROTO);
3000 memcpy(val, reply, *vallen);
3002 ptlrpc_req_finished(req);
3010 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3012 struct llog_ctxt *ctxt;
3016 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3018 rc = llog_initiator_connect(ctxt);
3019 llog_ctxt_put(ctxt);
3021 /* XXX return an error? skip setting below flags? */
3024 cfs_spin_lock(&imp->imp_lock);
3025 imp->imp_server_timeout = 1;
3026 imp->imp_pingable = 1;
3027 cfs_spin_unlock(&imp->imp_lock);
3028 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3033 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3034 struct ptlrpc_request *req,
3041 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3044 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3045 obd_count keylen, void *key, obd_count vallen,
3046 void *val, struct ptlrpc_request_set *set)
3048 struct ptlrpc_request *req;
3049 struct obd_device *obd = exp->exp_obd;
3050 struct obd_import *imp = class_exp2cliimp(exp);
3055 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3057 if (KEY_IS(KEY_NEXT_ID)) {
3059 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3061 if (vallen != sizeof(obd_id))
3066 if (vallen != sizeof(obd_id))
3069 /* avoid race between allocate new object and set next id
3070 * from ll_sync thread */
3071 cfs_spin_lock(&oscc->oscc_lock);
3072 new_val = *((obd_id*)val) + 1;
3073 if (new_val > oscc->oscc_next_id)
3074 oscc->oscc_next_id = new_val;
3075 cfs_spin_unlock(&oscc->oscc_lock);
3076 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3077 exp->exp_obd->obd_name,
3078 obd->u.cli.cl_oscc.oscc_next_id);
3083 if (KEY_IS(KEY_CHECKSUM)) {
3084 if (vallen != sizeof(int))
3086 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3090 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3091 sptlrpc_conf_client_adapt(obd);
3095 if (KEY_IS(KEY_FLUSH_CTX)) {
3096 sptlrpc_import_flush_my_ctx(imp);
3100 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3103 /* We pass all other commands directly to OST. Since nobody calls osc
3104 methods directly and everybody is supposed to go through LOV, we
3105 assume lov checked invalid values for us.
3106 The only recognised values so far are evict_by_nid and mds_conn.
3107 Even if something bad goes through, we'd get a -EINVAL from OST
3110 if (KEY_IS(KEY_GRANT_SHRINK))
3111 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3113 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3118 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3119 RCL_CLIENT, keylen);
3120 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3121 RCL_CLIENT, vallen);
3122 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3124 ptlrpc_request_free(req);
3128 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3129 memcpy(tmp, key, keylen);
3130 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3131 memcpy(tmp, val, vallen);
3133 if (KEY_IS(KEY_MDS_CONN)) {
3134 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3136 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3137 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3138 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3139 req->rq_no_delay = req->rq_no_resend = 1;
3140 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3141 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3142 struct osc_grant_args *aa;
3145 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3146 aa = ptlrpc_req_async_args(req);
3149 ptlrpc_req_finished(req);
3152 *oa = ((struct ost_body *)val)->oa;
3154 req->rq_interpret_reply = osc_shrink_grant_interpret;
3157 ptlrpc_request_set_replen(req);
3158 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3159 LASSERT(set != NULL);
3160 ptlrpc_set_add_req(set, req);
3161 ptlrpc_check_set(NULL, set);
3163 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3169 static struct llog_operations osc_size_repl_logops = {
3170 lop_cancel: llog_obd_repl_cancel
3173 static struct llog_operations osc_mds_ost_orig_logops;
3175 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3176 struct obd_device *tgt, struct llog_catid *catid)
3181 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3182 &catid->lci_logid, &osc_mds_ost_orig_logops);
3184 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3188 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3189 NULL, &osc_size_repl_logops);
3191 struct llog_ctxt *ctxt =
3192 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3195 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3200 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
3201 obd->obd_name, tgt->obd_name, catid, rc);
3202 CERROR("logid "LPX64":0x%x\n",
3203 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3208 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3209 struct obd_device *disk_obd, int *index)
3211 struct llog_catid catid;
3212 static char name[32] = CATLIST;
3216 LASSERT(olg == &obd->obd_olg);
3218 cfs_mutex_lock(&olg->olg_cat_processing);
3219 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
3221 CERROR("rc: %d\n", rc);
3225 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
3226 obd->obd_name, *index, catid.lci_logid.lgl_oid,
3227 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
3229 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
3231 CERROR("rc: %d\n", rc);
3235 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
3237 CERROR("rc: %d\n", rc);
3242 cfs_mutex_unlock(&olg->olg_cat_processing);
3247 static int osc_llog_finish(struct obd_device *obd, int count)
3249 struct llog_ctxt *ctxt;
3250 int rc = 0, rc2 = 0;
3253 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3255 rc = llog_cleanup(ctxt);
3257 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3259 rc2 = llog_cleanup(ctxt);
3266 static int osc_reconnect(const struct lu_env *env,
3267 struct obd_export *exp, struct obd_device *obd,
3268 struct obd_uuid *cluuid,
3269 struct obd_connect_data *data,
3272 struct client_obd *cli = &obd->u.cli;
3274 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3277 client_obd_list_lock(&cli->cl_loi_list_lock);
3278 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3279 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3280 lost_grant = cli->cl_lost_grant;
3281 cli->cl_lost_grant = 0;
3282 client_obd_list_unlock(&cli->cl_loi_list_lock);
3284 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3285 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
3286 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
3287 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3288 " ocd_grant: %d\n", data->ocd_connect_flags,
3289 data->ocd_version, data->ocd_grant);
3295 static int osc_disconnect(struct obd_export *exp)
3297 struct obd_device *obd = class_exp2obd(exp);
3298 struct llog_ctxt *ctxt;
3301 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3303 if (obd->u.cli.cl_conn_count == 1) {
3304 /* Flush any remaining cancel messages out to the
3306 llog_sync(ctxt, exp);
3308 llog_ctxt_put(ctxt);
3310 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3314 rc = client_disconnect_export(exp);
3316 * Initially we put del_shrink_grant before disconnect_export, but it
3317 * causes the following problem if setup (connect) and cleanup
3318 * (disconnect) are tangled together.
3319 * connect p1 disconnect p2
3320 * ptlrpc_connect_import
3321 * ............... class_manual_cleanup
3324 * ptlrpc_connect_interrupt
3326 * add this client to shrink list
3328 * Bang! pinger trigger the shrink.
3329 * So the osc should be disconnected from the shrink list, after we
3330 * are sure the import has been destroyed. BUG18662
3332 if (obd->u.cli.cl_import == NULL)
3333 osc_del_shrink_grant(&obd->u.cli);
3337 static int osc_import_event(struct obd_device *obd,
3338 struct obd_import *imp,
3339 enum obd_import_event event)
3341 struct client_obd *cli;
3345 LASSERT(imp->imp_obd == obd);
3348 case IMP_EVENT_DISCON: {
3349 /* Only do this on the MDS OSC's */
3350 if (imp->imp_server_timeout) {
3351 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3353 cfs_spin_lock(&oscc->oscc_lock);
3354 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3355 cfs_spin_unlock(&oscc->oscc_lock);
3358 client_obd_list_lock(&cli->cl_loi_list_lock);
3359 cli->cl_avail_grant = 0;
3360 cli->cl_lost_grant = 0;
3361 client_obd_list_unlock(&cli->cl_loi_list_lock);
3364 case IMP_EVENT_INACTIVE: {
3365 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3368 case IMP_EVENT_INVALIDATE: {
3369 struct ldlm_namespace *ns = obd->obd_namespace;
3373 env = cl_env_get(&refcheck);
3377 client_obd_list_lock(&cli->cl_loi_list_lock);
3378 /* all pages go to failing rpcs due to the invalid
3380 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3381 client_obd_list_unlock(&cli->cl_loi_list_lock);
3383 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3384 cl_env_put(env, &refcheck);
3389 case IMP_EVENT_ACTIVE: {
3390 /* Only do this on the MDS OSC's */
3391 if (imp->imp_server_timeout) {
3392 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3394 cfs_spin_lock(&oscc->oscc_lock);
3395 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
3396 OSCC_FLAG_NOSPC_BLK);
3397 cfs_spin_unlock(&oscc->oscc_lock);
3399 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3402 case IMP_EVENT_OCD: {
3403 struct obd_connect_data *ocd = &imp->imp_connect_data;
3405 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3406 osc_init_grant(&obd->u.cli, ocd);
3409 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3410 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3412 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3415 case IMP_EVENT_DEACTIVATE: {
3416 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3419 case IMP_EVENT_ACTIVATE: {
3420 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3424 CERROR("Unknown import event %d\n", event);
3431 * Determine whether the lock can be canceled before replaying the lock
3432 * during recovery, see bug16774 for detailed information.
3434 * \retval zero the lock can't be canceled
3435 * \retval other ok to cancel
3437 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3439 check_res_locked(lock->l_resource);
3442 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3444 * XXX as a future improvement, we can also cancel unused write lock
3445 * if it doesn't have dirty data and active mmaps.
3447 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3448 (lock->l_granted_mode == LCK_PR ||
3449 lock->l_granted_mode == LCK_CR) &&
3450 (osc_dlm_lock_pageref(lock) == 0))
3456 static int brw_queue_work(const struct lu_env *env, void *data)
3458 struct client_obd *cli = data;
3460 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3462 client_obd_list_lock(&cli->cl_loi_list_lock);
3463 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3464 client_obd_list_unlock(&cli->cl_loi_list_lock);
3468 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3470 struct client_obd *cli = &obd->u.cli;
3475 rc = ptlrpcd_addref();
3479 rc = client_obd_setup(obd, lcfg);
3482 handler = ptlrpcd_alloc_work(cli->cl_import,
3483 brw_queue_work, cli);
3484 if (!IS_ERR(handler))
3485 cli->cl_writeback_work = handler;
3487 rc = PTR_ERR(handler);
3491 struct lprocfs_static_vars lvars = { 0 };
3493 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3494 lprocfs_osc_init_vars(&lvars);
3495 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3496 lproc_osc_attach_seqstat(obd);
3497 sptlrpc_lprocfs_cliobd_attach(obd);
3498 ptlrpc_lprocfs_register_obd(obd);
3502 /* We need to allocate a few requests more, because
3503 brw_interpret tries to create new requests before freeing
3504 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3505 reserved, but I afraid that might be too much wasted RAM
3506 in fact, so 2 is just my guess and still should work. */
3507 cli->cl_import->imp_rq_pool =
3508 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3510 ptlrpc_add_rqs_to_pool);
3512 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3514 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3522 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3528 case OBD_CLEANUP_EARLY: {
3529 struct obd_import *imp;
3530 imp = obd->u.cli.cl_import;
3531 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3532 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3533 ptlrpc_deactivate_import(imp);
3534 cfs_spin_lock(&imp->imp_lock);
3535 imp->imp_pingable = 0;
3536 cfs_spin_unlock(&imp->imp_lock);
3539 case OBD_CLEANUP_EXPORTS: {
3540 struct client_obd *cli = &obd->u.cli;
3542 * for echo client, export may be on zombie list, wait for
3543 * zombie thread to cull it, because cli.cl_import will be
3544 * cleared in client_disconnect_export():
3545 * class_export_destroy() -> obd_cleanup() ->
3546 * echo_device_free() -> echo_client_cleanup() ->
3547 * obd_disconnect() -> osc_disconnect() ->
3548 * client_disconnect_export()
3550 obd_zombie_barrier();
3551 if (cli->cl_writeback_work) {
3552 ptlrpcd_destroy_work(cli->cl_writeback_work);
3553 cli->cl_writeback_work = NULL;
3555 obd_cleanup_client_import(obd);
3556 ptlrpc_lprocfs_unregister_obd(obd);
3557 lprocfs_obd_cleanup(obd);
3558 rc = obd_llog_finish(obd, 0);
3560 CERROR("failed to cleanup llogging subsystems\n");
3567 int osc_cleanup(struct obd_device *obd)
3573 /* free memory of osc quota cache */
3574 osc_quota_cleanup(obd);
3576 rc = client_obd_cleanup(obd);
3582 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3584 struct lprocfs_static_vars lvars = { 0 };
3587 lprocfs_osc_init_vars(&lvars);
3589 switch (lcfg->lcfg_command) {
3591 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3601 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3603 return osc_process_config_base(obd, buf);
3606 struct obd_ops osc_obd_ops = {
3607 .o_owner = THIS_MODULE,
3608 .o_setup = osc_setup,
3609 .o_precleanup = osc_precleanup,
3610 .o_cleanup = osc_cleanup,
3611 .o_add_conn = client_import_add_conn,
3612 .o_del_conn = client_import_del_conn,
3613 .o_connect = client_connect_import,
3614 .o_reconnect = osc_reconnect,
3615 .o_disconnect = osc_disconnect,
3616 .o_statfs = osc_statfs,
3617 .o_statfs_async = osc_statfs_async,
3618 .o_packmd = osc_packmd,
3619 .o_unpackmd = osc_unpackmd,
3620 .o_precreate = osc_precreate,
3621 .o_create = osc_create,
3622 .o_create_async = osc_create_async,
3623 .o_destroy = osc_destroy,
3624 .o_getattr = osc_getattr,
3625 .o_getattr_async = osc_getattr_async,
3626 .o_setattr = osc_setattr,
3627 .o_setattr_async = osc_setattr_async,
3629 .o_punch = osc_punch,
3631 .o_enqueue = osc_enqueue,
3632 .o_change_cbdata = osc_change_cbdata,
3633 .o_find_cbdata = osc_find_cbdata,
3634 .o_cancel = osc_cancel,
3635 .o_cancel_unused = osc_cancel_unused,
3636 .o_iocontrol = osc_iocontrol,
3637 .o_get_info = osc_get_info,
3638 .o_set_info_async = osc_set_info_async,
3639 .o_import_event = osc_import_event,
3640 .o_llog_init = osc_llog_init,
3641 .o_llog_finish = osc_llog_finish,
3642 .o_process_config = osc_process_config,
3643 .o_quotactl = osc_quotactl,
3644 .o_quotacheck = osc_quotacheck,
3645 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
3648 extern struct lu_kmem_descr osc_caches[];
3649 extern cfs_spinlock_t osc_ast_guard;
3650 extern cfs_lock_class_key_t osc_ast_guard_class;
3652 int __init osc_init(void)
3654 struct lprocfs_static_vars lvars = { 0 };
3658 /* print an address of _any_ initialized kernel symbol from this
3659 * module, to allow debugging with gdb that doesn't support data
3660 * symbols from modules.*/
3661 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3663 rc = lu_kmem_init(osc_caches);
3665 lprocfs_osc_init_vars(&lvars);
3668 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3669 LUSTRE_OSC_NAME, &osc_device_type);
3671 lu_kmem_fini(osc_caches);
3675 cfs_spin_lock_init(&osc_ast_guard);
3676 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3678 osc_mds_ost_orig_logops = llog_lvfs_ops;
3679 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3680 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3681 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3682 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3688 static void /*__exit*/ osc_exit(void)
3691 class_unregister_type(LUSTRE_OSC_NAME);
3692 lu_kmem_fini(osc_caches);
3695 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3696 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3697 MODULE_LICENSE("GPL");
3699 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);