4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 # include <liblustre.h>
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66 struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
107 struct obd_import *imp = class_exp2cliimp(exp);
111 if (lmm_bytes < sizeof (*lmm)) {
112 CERROR("lov_mds_md too small: %d, need %d\n",
113 lmm_bytes, (int)sizeof(*lmm));
116 /* XXX LOV_MAGIC etc check? */
118 if (lmm->lmm_object_id == 0) {
119 CERROR("lov_mds_md: zero lmm_object_id\n");
124 lsm_size = lov_stripe_md_size(1);
128 if (*lsmp != NULL && lmm == NULL) {
129 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130 OBD_FREE(*lsmp, lsm_size);
136 OBD_ALLOC(*lsmp, lsm_size);
139 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141 OBD_FREE(*lsmp, lsm_size);
144 loi_init((*lsmp)->lsm_oinfo[0]);
148 /* XXX zero *lsmp? */
149 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151 LASSERT((*lsmp)->lsm_object_id);
152 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
156 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264 struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308 struct obd_info *oinfo, struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324 ptlrpc_request_free(req);
328 osc_pack_req_body(req, oinfo);
330 ptlrpc_request_set_replen(req);
332 rc = ptlrpc_queue_wait(req);
336 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338 GOTO(out, rc = -EPROTO);
340 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344 ptlrpc_req_finished(req);
348 static int osc_setattr_interpret(const struct lu_env *env,
349 struct ptlrpc_request *req,
350 struct osc_setattr_args *sa, int rc)
352 struct ost_body *body;
358 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360 GOTO(out, rc = -EPROTO);
362 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
364 rc = sa->sa_upcall(sa->sa_cookie, rc);
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 obd_enqueue_update_f upcall, void *cookie,
371 struct ptlrpc_request_set *rqset)
373 struct ptlrpc_request *req;
374 struct osc_setattr_args *sa;
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385 ptlrpc_request_free(req);
389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392 osc_pack_req_body(req, oinfo);
394 ptlrpc_request_set_replen(req);
396 /* do mds to ost setattr asynchronously */
398 /* Do not wait for response. */
399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401 req->rq_interpret_reply =
402 (ptlrpc_interpterer_t)osc_setattr_interpret;
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413 ptlrpc_set_add_req(rqset, req);
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420 struct obd_trans_info *oti,
421 struct ptlrpc_request_set *rqset)
423 return osc_setattr_async_base(exp, oinfo, oti,
424 oinfo->oi_cb_up, oinfo, rqset);
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428 struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 struct ptlrpc_request *req;
431 struct ost_body *body;
432 struct lov_stripe_md *lsm;
441 rc = obd_alloc_memmd(exp, &lsm);
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448 GOTO(out, rc = -ENOMEM);
450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452 ptlrpc_request_free(req);
456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
458 lustre_set_wire_obdo(&body->oa, oa);
460 ptlrpc_request_set_replen(req);
462 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463 oa->o_flags == OBD_FL_DELORPHAN) {
465 "delorphan from OST integration");
466 /* Don't resend the delorphan req */
467 req->rq_no_resend = req->rq_no_delay = 1;
470 rc = ptlrpc_queue_wait(req);
474 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476 GOTO(out_req, rc = -EPROTO);
478 lustre_get_wire_obdo(oa, &body->oa);
480 /* This should really be sent by the OST */
481 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482 oa->o_valid |= OBD_MD_FLBLKSZ;
484 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485 * have valid lsm_oinfo data structs, so don't go touching that.
486 * This needs to be fixed in a big way.
488 lsm->lsm_object_id = oa->o_id;
489 lsm->lsm_object_seq = oa->o_seq;
493 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496 if (!oti->oti_logcookies)
497 oti_alloc_cookies(oti, 1);
498 *oti->oti_logcookies = oa->o_lcookie;
502 CDEBUG(D_HA, "transno: "LPD64"\n",
503 lustre_msg_get_transno(req->rq_repmsg));
505 ptlrpc_req_finished(req);
508 obd_free_memmd(exp, &lsm);
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513 obd_enqueue_update_f upcall, void *cookie,
514 struct ptlrpc_request_set *rqset)
516 struct ptlrpc_request *req;
517 struct osc_setattr_args *sa;
518 struct ost_body *body;
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529 ptlrpc_request_free(req);
532 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533 ptlrpc_at_set_req_timeout(req);
535 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538 osc_pack_capa(req, body, oinfo->oi_capa);
540 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557 struct obd_info *oinfo, struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync_interpret(const struct lu_env *env,
568 struct ptlrpc_request *req,
571 struct osc_fsync_args *fa = arg;
572 struct ost_body *body;
578 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580 CERROR ("can't unpack ost_body\n");
581 GOTO(out, rc = -EPROTO);
584 *fa->fa_oi->oi_oa = body->oa;
586 rc = fa->fa_upcall(fa->fa_cookie, rc);
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591 obd_enqueue_update_f upcall, void *cookie,
592 struct ptlrpc_request_set *rqset)
594 struct ptlrpc_request *req;
595 struct ost_body *body;
596 struct osc_fsync_args *fa;
600 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
604 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
607 ptlrpc_request_free(req);
611 /* overload the size and blocks fields in the oa with start/end */
612 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615 osc_pack_capa(req, body, oinfo->oi_capa);
617 ptlrpc_request_set_replen(req);
618 req->rq_interpret_reply = osc_sync_interpret;
620 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621 fa = ptlrpc_req_async_args(req);
623 fa->fa_upcall = upcall;
624 fa->fa_cookie = cookie;
626 if (rqset == PTLRPCD_SET)
627 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
629 ptlrpc_set_add_req(rqset, req);
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635 struct obd_info *oinfo, obd_size start, obd_size end,
636 struct ptlrpc_request_set *set)
641 CDEBUG(D_INFO, "oa NULL\n");
645 oinfo->oi_oa->o_size = start;
646 oinfo->oi_oa->o_blocks = end;
647 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
649 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
652 /* Find and cancel locally locks matched by @mode in the resource found by
653 * @objid. Found locks are added into @cancel list. Returns the amount of
654 * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
657 ldlm_mode_t mode, int lock_flags)
659 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660 struct ldlm_res_id res_id;
661 struct ldlm_resource *res;
665 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
670 LDLM_RESOURCE_ADDREF(res);
671 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672 lock_flags, 0, NULL);
673 LDLM_RESOURCE_DELREF(res);
674 ldlm_resource_putref(res);
678 static int osc_destroy_interpret(const struct lu_env *env,
679 struct ptlrpc_request *req, void *data,
682 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
684 cfs_atomic_dec(&cli->cl_destroy_in_flight);
685 cfs_waitq_signal(&cli->cl_destroy_waitq);
689 static int osc_can_send_destroy(struct client_obd *cli)
691 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692 cli->cl_max_rpcs_in_flight) {
693 /* The destroy request can be sent */
696 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697 cli->cl_max_rpcs_in_flight) {
699 * The counter has been modified between the two atomic
702 cfs_waitq_signal(&cli->cl_destroy_waitq);
707 /* Destroy requests can be async always on the client, and we don't even really
708 * care about the return code since the client cannot do anything at all about
710 * When the MDS is unlinking a filename, it saves the file objects into a
711 * recovery llog, and these object records are cancelled when the OST reports
712 * they were destroyed and sync'd to disk (i.e. transaction committed).
713 * If the client dies, or the OST is down when the object should be destroyed,
714 * the records are not cancelled, and when the OST reconnects to the MDS next,
715 * it will retrieve the llog unlink logs and then sends the log cancellation
716 * cookies to the MDS after committing destroy transactions. */
717 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
718 struct obdo *oa, struct lov_stripe_md *ea,
719 struct obd_trans_info *oti, struct obd_export *md_export,
722 struct client_obd *cli = &exp->exp_obd->u.cli;
723 struct ptlrpc_request *req;
724 struct ost_body *body;
725 CFS_LIST_HEAD(cancels);
730 CDEBUG(D_INFO, "oa NULL\n");
734 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
735 LDLM_FL_DISCARD_DATA);
737 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
739 ldlm_lock_list_put(&cancels, l_bl_ast, count);
743 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
744 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
747 ptlrpc_request_free(req);
751 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
752 ptlrpc_at_set_req_timeout(req);
754 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
755 oa->o_lcookie = *oti->oti_logcookies;
756 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
758 lustre_set_wire_obdo(&body->oa, oa);
760 osc_pack_capa(req, body, (struct obd_capa *)capa);
761 ptlrpc_request_set_replen(req);
763 /* If osc_destory is for destroying the unlink orphan,
764 * sent from MDT to OST, which should not be blocked here,
765 * because the process might be triggered by ptlrpcd, and
766 * it is not good to block ptlrpcd thread (b=16006)*/
767 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
768 req->rq_interpret_reply = osc_destroy_interpret;
769 if (!osc_can_send_destroy(cli)) {
770 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
774 * Wait until the number of on-going destroy RPCs drops
775 * under max_rpc_in_flight
777 l_wait_event_exclusive(cli->cl_destroy_waitq,
778 osc_can_send_destroy(cli), &lwi);
782 /* Do not wait for response */
783 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
787 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
790 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
792 LASSERT(!(oa->o_valid & bits));
795 client_obd_list_lock(&cli->cl_loi_list_lock);
796 oa->o_dirty = cli->cl_dirty;
797 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
798 CERROR("dirty %lu - %lu > dirty_max %lu\n",
799 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
801 } else if (cfs_atomic_read(&obd_dirty_pages) -
802 cfs_atomic_read(&obd_dirty_transit_pages) >
803 obd_max_dirty_pages + 1){
804 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
805 * not covered by a lock thus they may safely race and trip
806 * this CERROR() unless we add in a small fudge factor (+1). */
807 CERROR("dirty %d - %d > system dirty_max %d\n",
808 cfs_atomic_read(&obd_dirty_pages),
809 cfs_atomic_read(&obd_dirty_transit_pages),
810 obd_max_dirty_pages);
812 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
813 CERROR("dirty %lu - dirty_max %lu too big???\n",
814 cli->cl_dirty, cli->cl_dirty_max);
817 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
818 (cli->cl_max_rpcs_in_flight + 1);
819 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
821 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
822 oa->o_dropped = cli->cl_lost_grant;
823 cli->cl_lost_grant = 0;
824 client_obd_list_unlock(&cli->cl_loi_list_lock);
825 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
826 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
830 void osc_update_next_shrink(struct client_obd *cli)
832 cli->cl_next_shrink_grant =
833 cfs_time_shift(cli->cl_grant_shrink_interval);
834 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
835 cli->cl_next_shrink_grant);
838 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
840 client_obd_list_lock(&cli->cl_loi_list_lock);
841 cli->cl_avail_grant += grant;
842 client_obd_list_unlock(&cli->cl_loi_list_lock);
845 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
847 if (body->oa.o_valid & OBD_MD_FLGRANT) {
848 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
849 __osc_update_grant(cli, body->oa.o_grant);
853 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
854 obd_count keylen, void *key, obd_count vallen,
855 void *val, struct ptlrpc_request_set *set);
857 static int osc_shrink_grant_interpret(const struct lu_env *env,
858 struct ptlrpc_request *req,
861 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
862 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
863 struct ost_body *body;
866 __osc_update_grant(cli, oa->o_grant);
870 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
872 osc_update_grant(cli, body);
878 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
880 client_obd_list_lock(&cli->cl_loi_list_lock);
881 oa->o_grant = cli->cl_avail_grant / 4;
882 cli->cl_avail_grant -= oa->o_grant;
883 client_obd_list_unlock(&cli->cl_loi_list_lock);
884 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
885 oa->o_valid |= OBD_MD_FLFLAGS;
888 oa->o_flags |= OBD_FL_SHRINK_GRANT;
889 osc_update_next_shrink(cli);
892 /* Shrink the current grant, either from some large amount to enough for a
893 * full set of in-flight RPCs, or if we have already shrunk to that limit
894 * then to enough for a single RPC. This avoids keeping more grant than
895 * needed, and avoids shrinking the grant piecemeal. */
896 static int osc_shrink_grant(struct client_obd *cli)
898 long target = (cli->cl_max_rpcs_in_flight + 1) *
899 cli->cl_max_pages_per_rpc;
901 client_obd_list_lock(&cli->cl_loi_list_lock);
902 if (cli->cl_avail_grant <= target)
903 target = cli->cl_max_pages_per_rpc;
904 client_obd_list_unlock(&cli->cl_loi_list_lock);
906 return osc_shrink_grant_to_target(cli, target);
909 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
912 struct ost_body *body;
915 client_obd_list_lock(&cli->cl_loi_list_lock);
916 /* Don't shrink if we are already above or below the desired limit
917 * We don't want to shrink below a single RPC, as that will negatively
918 * impact block allocation and long-term performance. */
919 if (target < cli->cl_max_pages_per_rpc)
920 target = cli->cl_max_pages_per_rpc;
922 if (target >= cli->cl_avail_grant) {
923 client_obd_list_unlock(&cli->cl_loi_list_lock);
926 client_obd_list_unlock(&cli->cl_loi_list_lock);
932 osc_announce_cached(cli, &body->oa, 0);
934 client_obd_list_lock(&cli->cl_loi_list_lock);
935 body->oa.o_grant = cli->cl_avail_grant - target;
936 cli->cl_avail_grant = target;
937 client_obd_list_unlock(&cli->cl_loi_list_lock);
938 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
939 body->oa.o_valid |= OBD_MD_FLFLAGS;
940 body->oa.o_flags = 0;
942 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
943 osc_update_next_shrink(cli);
945 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
946 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
947 sizeof(*body), body, NULL);
949 __osc_update_grant(cli, body->oa.o_grant);
954 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
955 static int osc_should_shrink_grant(struct client_obd *client)
957 cfs_time_t time = cfs_time_current();
958 cfs_time_t next_shrink = client->cl_next_shrink_grant;
960 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
961 OBD_CONNECT_GRANT_SHRINK) == 0)
964 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
965 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
966 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
969 osc_update_next_shrink(client);
974 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
976 struct client_obd *client;
978 cfs_list_for_each_entry(client, &item->ti_obd_list,
979 cl_grant_shrink_list) {
980 if (osc_should_shrink_grant(client))
981 osc_shrink_grant(client);
986 static int osc_add_shrink_grant(struct client_obd *client)
990 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
992 osc_grant_shrink_grant_cb, NULL,
993 &client->cl_grant_shrink_list);
995 CERROR("add grant client %s error %d\n",
996 client->cl_import->imp_obd->obd_name, rc);
999 CDEBUG(D_CACHE, "add grant client %s \n",
1000 client->cl_import->imp_obd->obd_name);
1001 osc_update_next_shrink(client);
1005 static int osc_del_shrink_grant(struct client_obd *client)
1007 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1011 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1014 * ocd_grant is the total grant amount we're expect to hold: if we've
1015 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1016 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1018 * race is tolerable here: if we're evicted, but imp_state already
1019 * left EVICTED state, then cl_dirty must be 0 already.
1021 client_obd_list_lock(&cli->cl_loi_list_lock);
1022 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1023 cli->cl_avail_grant = ocd->ocd_grant;
1025 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1027 if (cli->cl_avail_grant < 0) {
1028 CWARN("%s: available grant < 0, the OSS is probably not running"
1029 " with patch from bug20278 (%ld) \n",
1030 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1031 /* workaround for 1.6 servers which do not have
1032 * the patch from bug20278 */
1033 cli->cl_avail_grant = ocd->ocd_grant;
1036 /* determine the appropriate chunk size used by osc_extent. */
1037 cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1038 client_obd_list_unlock(&cli->cl_loi_list_lock);
1040 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1041 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1042 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1044 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1045 cfs_list_empty(&cli->cl_grant_shrink_list))
1046 osc_add_shrink_grant(cli);
1049 /* We assume that the reason this OSC got a short read is because it read
1050 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1051 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1052 * this stripe never got written at or beyond this stripe offset yet. */
1053 static void handle_short_read(int nob_read, obd_count page_count,
1054 struct brw_page **pga)
1059 /* skip bytes read OK */
1060 while (nob_read > 0) {
1061 LASSERT (page_count > 0);
1063 if (pga[i]->count > nob_read) {
1064 /* EOF inside this page */
1065 ptr = cfs_kmap(pga[i]->pg) +
1066 (pga[i]->off & ~CFS_PAGE_MASK);
1067 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1068 cfs_kunmap(pga[i]->pg);
1074 nob_read -= pga[i]->count;
1079 /* zero remaining pages */
1080 while (page_count-- > 0) {
1081 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1082 memset(ptr, 0, pga[i]->count);
1083 cfs_kunmap(pga[i]->pg);
1088 static int check_write_rcs(struct ptlrpc_request *req,
1089 int requested_nob, int niocount,
1090 obd_count page_count, struct brw_page **pga)
1095 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1096 sizeof(*remote_rcs) *
1098 if (remote_rcs == NULL) {
1099 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1103 /* return error if any niobuf was in error */
1104 for (i = 0; i < niocount; i++) {
1105 if ((int)remote_rcs[i] < 0)
1106 return(remote_rcs[i]);
1108 if (remote_rcs[i] != 0) {
1109 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1110 i, remote_rcs[i], req);
1115 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1116 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1117 req->rq_bulk->bd_nob_transferred, requested_nob);
1124 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1126 if (p1->flag != p2->flag) {
1127 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1128 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1130 /* warn if we try to combine flags that we don't know to be
1131 * safe to combine */
1132 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1133 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1134 "report this at http://bugs.whamcloud.com/\n",
1135 p1->flag, p2->flag);
1140 return (p1->off + p1->count == p2->off);
1143 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1144 struct brw_page **pga, int opc,
1145 cksum_type_t cksum_type)
1150 LASSERT (pg_count > 0);
1151 cksum = init_checksum(cksum_type);
1152 while (nob > 0 && pg_count > 0) {
1153 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1154 int off = pga[i]->off & ~CFS_PAGE_MASK;
1155 int count = pga[i]->count > nob ? nob : pga[i]->count;
1157 /* corrupt the data before we compute the checksum, to
1158 * simulate an OST->client data error */
1159 if (i == 0 && opc == OST_READ &&
1160 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1161 memcpy(ptr + off, "bad1", min(4, nob));
1162 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1163 cfs_kunmap(pga[i]->pg);
1164 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1167 nob -= pga[i]->count;
1171 /* For sending we only compute the wrong checksum instead
1172 * of corrupting the data so it is still correct on a redo */
1173 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1176 return fini_checksum(cksum, cksum_type);
1179 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1180 struct lov_stripe_md *lsm, obd_count page_count,
1181 struct brw_page **pga,
1182 struct ptlrpc_request **reqp,
1183 struct obd_capa *ocapa, int reserve,
1186 struct ptlrpc_request *req;
1187 struct ptlrpc_bulk_desc *desc;
1188 struct ost_body *body;
1189 struct obd_ioobj *ioobj;
1190 struct niobuf_remote *niobuf;
1191 int niocount, i, requested_nob, opc, rc;
1192 struct osc_brw_async_args *aa;
1193 struct req_capsule *pill;
1194 struct brw_page *pg_prev;
1197 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1198 RETURN(-ENOMEM); /* Recoverable */
1199 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1200 RETURN(-EINVAL); /* Fatal */
1202 if ((cmd & OBD_BRW_WRITE) != 0) {
1204 req = ptlrpc_request_alloc_pool(cli->cl_import,
1205 cli->cl_import->imp_rq_pool,
1206 &RQF_OST_BRW_WRITE);
1209 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1214 for (niocount = i = 1; i < page_count; i++) {
1215 if (!can_merge_pages(pga[i - 1], pga[i]))
1219 pill = &req->rq_pill;
1220 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1222 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1223 niocount * sizeof(*niobuf));
1224 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1226 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1228 ptlrpc_request_free(req);
1231 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1232 ptlrpc_at_set_req_timeout(req);
1234 if (opc == OST_WRITE)
1235 desc = ptlrpc_prep_bulk_imp(req, page_count,
1236 BULK_GET_SOURCE, OST_BULK_PORTAL);
1238 desc = ptlrpc_prep_bulk_imp(req, page_count,
1239 BULK_PUT_SINK, OST_BULK_PORTAL);
1242 GOTO(out, rc = -ENOMEM);
1243 /* NB request now owns desc and will free it when it gets freed */
1245 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1246 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1247 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1248 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1250 lustre_set_wire_obdo(&body->oa, oa);
1252 obdo_to_ioobj(oa, ioobj);
1253 ioobj->ioo_bufcnt = niocount;
1254 osc_pack_capa(req, body, ocapa);
1255 LASSERT (page_count > 0);
1257 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1258 struct brw_page *pg = pga[i];
1259 int poff = pg->off & ~CFS_PAGE_MASK;
1261 LASSERT(pg->count > 0);
1262 /* make sure there is no gap in the middle of page array */
1263 LASSERTF(page_count == 1 ||
1264 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1265 ergo(i > 0 && i < page_count - 1,
1266 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1267 ergo(i == page_count - 1, poff == 0)),
1268 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1269 i, page_count, pg, pg->off, pg->count);
1271 LASSERTF(i == 0 || pg->off > pg_prev->off,
1272 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1273 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1275 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1276 pg_prev->pg, page_private(pg_prev->pg),
1277 pg_prev->pg->index, pg_prev->off);
1279 LASSERTF(i == 0 || pg->off > pg_prev->off,
1280 "i %d p_c %u\n", i, page_count);
1282 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1283 (pg->flag & OBD_BRW_SRVLOCK));
1285 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1286 requested_nob += pg->count;
1288 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1290 niobuf->len += pg->count;
1292 niobuf->offset = pg->off;
1293 niobuf->len = pg->count;
1294 niobuf->flags = pg->flag;
1299 LASSERTF((void *)(niobuf - niocount) ==
1300 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1301 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1302 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1304 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1306 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1307 body->oa.o_valid |= OBD_MD_FLFLAGS;
1308 body->oa.o_flags = 0;
1310 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1313 if (osc_should_shrink_grant(cli))
1314 osc_shrink_grant_local(cli, &body->oa);
1316 /* size[REQ_REC_OFF] still sizeof (*body) */
1317 if (opc == OST_WRITE) {
1318 if (cli->cl_checksum &&
1319 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1320 /* store cl_cksum_type in a local variable since
1321 * it can be changed via lprocfs */
1322 cksum_type_t cksum_type = cli->cl_cksum_type;
1324 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1325 oa->o_flags &= OBD_FL_LOCAL_MASK;
1326 body->oa.o_flags = 0;
1328 body->oa.o_flags |= cksum_type_pack(cksum_type);
1329 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1330 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1334 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1336 /* save this in 'oa', too, for later checking */
1337 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1338 oa->o_flags |= cksum_type_pack(cksum_type);
1340 /* clear out the checksum flag, in case this is a
1341 * resend but cl_checksum is no longer set. b=11238 */
1342 oa->o_valid &= ~OBD_MD_FLCKSUM;
1344 oa->o_cksum = body->oa.o_cksum;
1345 /* 1 RC per niobuf */
1346 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1347 sizeof(__u32) * niocount);
1349 if (cli->cl_checksum &&
1350 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1351 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1352 body->oa.o_flags = 0;
1353 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1354 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1357 ptlrpc_request_set_replen(req);
1359 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1360 aa = ptlrpc_req_async_args(req);
1362 aa->aa_requested_nob = requested_nob;
1363 aa->aa_nio_count = niocount;
1364 aa->aa_page_count = page_count;
1368 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1369 if (ocapa && reserve)
1370 aa->aa_ocapa = capa_get(ocapa);
1376 ptlrpc_req_finished(req);
1380 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1381 __u32 client_cksum, __u32 server_cksum, int nob,
1382 obd_count page_count, struct brw_page **pga,
1383 cksum_type_t client_cksum_type)
1387 cksum_type_t cksum_type;
1389 if (server_cksum == client_cksum) {
1390 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1394 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1396 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1399 if (cksum_type != client_cksum_type)
1400 msg = "the server did not use the checksum type specified in "
1401 "the original request - likely a protocol problem";
1402 else if (new_cksum == server_cksum)
1403 msg = "changed on the client after we checksummed it - "
1404 "likely false positive due to mmap IO (bug 11742)";
1405 else if (new_cksum == client_cksum)
1406 msg = "changed in transit before arrival at OST";
1408 msg = "changed in transit AND doesn't match the original - "
1409 "likely false positive due to mmap IO (bug 11742)";
1411 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1412 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1413 msg, libcfs_nid2str(peer->nid),
1414 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1415 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1416 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1418 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1420 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1421 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1422 "client csum now %x\n", client_cksum, client_cksum_type,
1423 server_cksum, cksum_type, new_cksum);
1427 /* Note rc enters this function as number of bytes transferred */
1428 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1430 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1431 const lnet_process_id_t *peer =
1432 &req->rq_import->imp_connection->c_peer;
1433 struct client_obd *cli = aa->aa_cli;
1434 struct ost_body *body;
1435 __u32 client_cksum = 0;
1438 if (rc < 0 && rc != -EDQUOT) {
1439 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1443 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1444 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1446 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1450 /* set/clear over quota flag for a uid/gid */
1451 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1452 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1453 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1455 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1456 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1458 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1461 osc_update_grant(cli, body);
1466 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1467 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1469 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1471 CERROR("Unexpected +ve rc %d\n", rc);
1474 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1476 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1479 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1480 check_write_checksum(&body->oa, peer, client_cksum,
1481 body->oa.o_cksum, aa->aa_requested_nob,
1482 aa->aa_page_count, aa->aa_ppga,
1483 cksum_type_unpack(aa->aa_oa->o_flags)))
1486 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1487 aa->aa_page_count, aa->aa_ppga);
1491 /* The rest of this function executes only for OST_READs */
1493 /* if unwrap_bulk failed, return -EAGAIN to retry */
1494 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1496 GOTO(out, rc = -EAGAIN);
1498 if (rc > aa->aa_requested_nob) {
1499 CERROR("Unexpected rc %d (%d requested)\n", rc,
1500 aa->aa_requested_nob);
1504 if (rc != req->rq_bulk->bd_nob_transferred) {
1505 CERROR ("Unexpected rc %d (%d transferred)\n",
1506 rc, req->rq_bulk->bd_nob_transferred);
1510 if (rc < aa->aa_requested_nob)
1511 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1513 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1514 static int cksum_counter;
1515 __u32 server_cksum = body->oa.o_cksum;
1518 cksum_type_t cksum_type;
1520 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1521 body->oa.o_flags : 0);
1522 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1523 aa->aa_ppga, OST_READ,
1526 if (peer->nid == req->rq_bulk->bd_sender) {
1530 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1533 if (server_cksum == ~0 && rc > 0) {
1534 CERROR("Protocol error: server %s set the 'checksum' "
1535 "bit, but didn't send a checksum. Not fatal, "
1536 "but please notify on http://bugs.whamcloud.com/\n",
1537 libcfs_nid2str(peer->nid));
1538 } else if (server_cksum != client_cksum) {
1539 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1540 "%s%s%s inode "DFID" object "
1541 LPU64"/"LPU64" extent "
1542 "["LPU64"-"LPU64"]\n",
1543 req->rq_import->imp_obd->obd_name,
1544 libcfs_nid2str(peer->nid),
1546 body->oa.o_valid & OBD_MD_FLFID ?
1547 body->oa.o_parent_seq : (__u64)0,
1548 body->oa.o_valid & OBD_MD_FLFID ?
1549 body->oa.o_parent_oid : 0,
1550 body->oa.o_valid & OBD_MD_FLFID ?
1551 body->oa.o_parent_ver : 0,
1553 body->oa.o_valid & OBD_MD_FLGROUP ?
1554 body->oa.o_seq : (__u64)0,
1555 aa->aa_ppga[0]->off,
1556 aa->aa_ppga[aa->aa_page_count-1]->off +
1557 aa->aa_ppga[aa->aa_page_count-1]->count -
1559 CERROR("client %x, server %x, cksum_type %x\n",
1560 client_cksum, server_cksum, cksum_type);
1562 aa->aa_oa->o_cksum = client_cksum;
1566 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1569 } else if (unlikely(client_cksum)) {
1570 static int cksum_missed;
1573 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1574 CERROR("Checksum %u requested from %s but not sent\n",
1575 cksum_missed, libcfs_nid2str(peer->nid));
1581 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1586 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1587 struct lov_stripe_md *lsm,
1588 obd_count page_count, struct brw_page **pga,
1589 struct obd_capa *ocapa)
1591 struct ptlrpc_request *req;
1594 int generation, resends = 0;
1595 struct l_wait_info lwi;
1599 cfs_waitq_init(&waitq);
1600 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1603 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1604 page_count, pga, &req, ocapa, 0, resends);
1609 req->rq_generation_set = 1;
1610 req->rq_import_generation = generation;
1611 req->rq_sent = cfs_time_current_sec() + resends;
1614 rc = ptlrpc_queue_wait(req);
1616 if (rc == -ETIMEDOUT && req->rq_resend) {
1617 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1618 ptlrpc_req_finished(req);
1622 rc = osc_brw_fini_request(req, rc);
1624 ptlrpc_req_finished(req);
1625 /* When server return -EINPROGRESS, client should always retry
1626 * regardless of the number of times the bulk was resent already.*/
1627 if (osc_recoverable_error(rc)) {
1629 if (rc != -EINPROGRESS &&
1630 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1631 CERROR("%s: too many resend retries for object: "
1632 ""LPU64":"LPU64", rc = %d.\n",
1633 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1637 exp->exp_obd->u.cli.cl_import->imp_generation) {
1638 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1639 ""LPU64":"LPU64", rc = %d.\n",
1640 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1644 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1646 l_wait_event(waitq, 0, &lwi);
1651 if (rc == -EAGAIN || rc == -EINPROGRESS)
1656 int osc_brw_redo_request(struct ptlrpc_request *request,
1657 struct osc_brw_async_args *aa)
1659 struct ptlrpc_request *new_req;
1660 struct osc_brw_async_args *new_aa;
1661 struct osc_async_page *oap;
1665 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1667 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1668 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1669 aa->aa_cli, aa->aa_oa,
1670 NULL /* lsm unused by osc currently */,
1671 aa->aa_page_count, aa->aa_ppga,
1672 &new_req, aa->aa_ocapa, 0, 1);
1676 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1677 if (oap->oap_request != NULL) {
1678 LASSERTF(request == oap->oap_request,
1679 "request %p != oap_request %p\n",
1680 request, oap->oap_request);
1681 if (oap->oap_interrupted) {
1682 ptlrpc_req_finished(new_req);
1687 /* New request takes over pga and oaps from old request.
1688 * Note that copying a list_head doesn't work, need to move it... */
1690 new_req->rq_interpret_reply = request->rq_interpret_reply;
1691 new_req->rq_async_args = request->rq_async_args;
1692 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1693 new_req->rq_generation_set = 1;
1694 new_req->rq_import_generation = request->rq_import_generation;
1696 new_aa = ptlrpc_req_async_args(new_req);
1698 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1699 cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1700 CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1701 cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1703 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1704 if (oap->oap_request) {
1705 ptlrpc_req_finished(oap->oap_request);
1706 oap->oap_request = ptlrpc_request_addref(new_req);
1710 new_aa->aa_ocapa = aa->aa_ocapa;
1711 aa->aa_ocapa = NULL;
1713 /* XXX: This code will run into problem if we're going to support
1714 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1715 * and wait for all of them to be finished. We should inherit request
1716 * set from old request. */
1717 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1719 DEBUG_REQ(D_INFO, new_req, "new request");
1724 * ugh, we want disk allocation on the target to happen in offset order. we'll
1725 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1726 * fine for our small page arrays and doesn't require allocation. its an
1727 * insertion sort that swaps elements that are strides apart, shrinking the
1728 * stride down until its '1' and the array is sorted.
1730 static void sort_brw_pages(struct brw_page **array, int num)
1733 struct brw_page *tmp;
1737 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1742 for (i = stride ; i < num ; i++) {
1745 while (j >= stride && array[j - stride]->off > tmp->off) {
1746 array[j] = array[j - stride];
1751 } while (stride > 1);
1754 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1760 LASSERT (pages > 0);
1761 offset = pg[i]->off & ~CFS_PAGE_MASK;
1765 if (pages == 0) /* that's all */
1768 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1769 return count; /* doesn't end on page boundary */
1772 offset = pg[i]->off & ~CFS_PAGE_MASK;
1773 if (offset != 0) /* doesn't start on page boundary */
1780 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1782 struct brw_page **ppga;
1785 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1789 for (i = 0; i < count; i++)
1794 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1796 LASSERT(ppga != NULL);
1797 OBD_FREE(ppga, sizeof(*ppga) * count);
1800 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1801 obd_count page_count, struct brw_page *pga,
1802 struct obd_trans_info *oti)
1804 struct obdo *saved_oa = NULL;
1805 struct brw_page **ppga, **orig;
1806 struct obd_import *imp = class_exp2cliimp(exp);
1807 struct client_obd *cli;
1808 int rc, page_count_orig;
1811 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1812 cli = &imp->imp_obd->u.cli;
1814 if (cmd & OBD_BRW_CHECK) {
1815 /* The caller just wants to know if there's a chance that this
1816 * I/O can succeed */
1818 if (imp->imp_invalid)
1823 /* test_brw with a failed create can trip this, maybe others. */
1824 LASSERT(cli->cl_max_pages_per_rpc);
1828 orig = ppga = osc_build_ppga(pga, page_count);
1831 page_count_orig = page_count;
1833 sort_brw_pages(ppga, page_count);
1834 while (page_count) {
1835 obd_count pages_per_brw;
1837 if (page_count > cli->cl_max_pages_per_rpc)
1838 pages_per_brw = cli->cl_max_pages_per_rpc;
1840 pages_per_brw = page_count;
1842 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1844 if (saved_oa != NULL) {
1845 /* restore previously saved oa */
1846 *oinfo->oi_oa = *saved_oa;
1847 } else if (page_count > pages_per_brw) {
1848 /* save a copy of oa (brw will clobber it) */
1849 OBDO_ALLOC(saved_oa);
1850 if (saved_oa == NULL)
1851 GOTO(out, rc = -ENOMEM);
1852 *saved_oa = *oinfo->oi_oa;
1855 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1856 pages_per_brw, ppga, oinfo->oi_capa);
1861 page_count -= pages_per_brw;
1862 ppga += pages_per_brw;
1866 osc_release_ppga(orig, page_count_orig);
1868 if (saved_oa != NULL)
1869 OBDO_FREE(saved_oa);
1874 static int brw_interpret(const struct lu_env *env,
1875 struct ptlrpc_request *req, void *data, int rc)
1877 struct osc_brw_async_args *aa = data;
1878 struct osc_extent *ext;
1879 struct osc_extent *tmp;
1880 struct cl_object *obj = NULL;
1881 struct client_obd *cli = aa->aa_cli;
1884 rc = osc_brw_fini_request(req, rc);
1885 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1886 /* When server return -EINPROGRESS, client should always retry
1887 * regardless of the number of times the bulk was resent already. */
1888 if (osc_recoverable_error(rc)) {
1889 if (req->rq_import_generation !=
1890 req->rq_import->imp_generation) {
1891 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1892 ""LPU64":"LPU64", rc = %d.\n",
1893 req->rq_import->imp_obd->obd_name,
1894 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1895 } else if (rc == -EINPROGRESS ||
1896 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1897 rc = osc_brw_redo_request(req, aa);
1899 CERROR("%s: too many resent retries for object: "
1900 ""LPU64":"LPU64", rc = %d.\n",
1901 req->rq_import->imp_obd->obd_name,
1902 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1907 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1912 capa_put(aa->aa_ocapa);
1913 aa->aa_ocapa = NULL;
1916 cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1917 if (obj == NULL && rc == 0) {
1918 obj = osc2cl(ext->oe_obj);
1922 cfs_list_del_init(&ext->oe_link);
1923 osc_extent_finish(env, ext, 1, rc);
1925 LASSERT(cfs_list_empty(&aa->aa_exts));
1926 LASSERT(cfs_list_empty(&aa->aa_oaps));
1929 struct obdo *oa = aa->aa_oa;
1930 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1931 unsigned long valid = 0;
1934 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1935 attr->cat_blocks = oa->o_blocks;
1936 valid |= CAT_BLOCKS;
1938 if (oa->o_valid & OBD_MD_FLMTIME) {
1939 attr->cat_mtime = oa->o_mtime;
1942 if (oa->o_valid & OBD_MD_FLATIME) {
1943 attr->cat_atime = oa->o_atime;
1946 if (oa->o_valid & OBD_MD_FLCTIME) {
1947 attr->cat_ctime = oa->o_ctime;
1951 cl_object_attr_lock(obj);
1952 cl_object_attr_set(env, obj, attr, valid);
1953 cl_object_attr_unlock(obj);
1955 cl_object_put(env, obj);
1957 OBDO_FREE(aa->aa_oa);
1959 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1960 req->rq_bulk->bd_nob_transferred);
1961 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1962 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1964 client_obd_list_lock(&cli->cl_loi_list_lock);
1965 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1966 * is called so we know whether to go to sync BRWs or wait for more
1967 * RPCs to complete */
1968 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1969 cli->cl_w_in_flight--;
1971 cli->cl_r_in_flight--;
1972 osc_wake_cache_waiters(cli);
1973 client_obd_list_unlock(&cli->cl_loi_list_lock);
1975 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1980 * Build an RPC by the list of extent @ext_list. The caller must ensure
1981 * that the total pages in this list are NOT over max pages per RPC.
1982 * Extents in the list must be in OES_RPC state.
1984 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1985 cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
1987 struct ptlrpc_request *req = NULL;
1988 struct osc_extent *ext;
1989 CFS_LIST_HEAD(rpc_list);
1990 struct brw_page **pga = NULL;
1991 struct osc_brw_async_args *aa = NULL;
1992 struct obdo *oa = NULL;
1993 struct osc_async_page *oap;
1994 struct osc_async_page *tmp;
1995 struct cl_req *clerq = NULL;
1996 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1997 struct ldlm_lock *lock = NULL;
1998 struct cl_req_attr crattr;
1999 obd_off starting_offset = OBD_OBJECT_EOF;
2000 obd_off ending_offset = 0;
2001 int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2004 LASSERT(!cfs_list_empty(ext_list));
2006 /* add pages into rpc_list to build BRW rpc */
2007 cfs_list_for_each_entry(ext, ext_list, oe_link) {
2008 LASSERT(ext->oe_state == OES_RPC);
2009 mem_tight |= ext->oe_memalloc;
2010 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2012 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2013 if (starting_offset > oap->oap_obj_off)
2014 starting_offset = oap->oap_obj_off;
2016 LASSERT(oap->oap_page_off == 0);
2017 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2018 ending_offset = oap->oap_obj_off +
2021 LASSERT(oap->oap_page_off + oap->oap_count ==
2027 mpflag = cfs_memory_pressure_get_and_set();
2029 memset(&crattr, 0, sizeof crattr);
2030 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2032 GOTO(out, rc = -ENOMEM);
2036 GOTO(out, rc = -ENOMEM);
2039 cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2040 struct cl_page *page = oap2cl_page(oap);
2041 if (clerq == NULL) {
2042 clerq = cl_req_alloc(env, page, crt,
2043 1 /* only 1-object rpcs for
2046 GOTO(out, rc = PTR_ERR(clerq));
2047 lock = oap->oap_ldlm_lock;
2050 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2051 pga[i] = &oap->oap_brw_page;
2052 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2053 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2054 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2056 cl_req_page_add(env, clerq, page);
2059 /* always get the data for the obdo for the rpc */
2060 LASSERT(clerq != NULL);
2062 crattr.cra_capa = NULL;
2063 memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2064 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2066 oa->o_handle = lock->l_remote_handle;
2067 oa->o_valid |= OBD_MD_FLHANDLE;
2070 rc = cl_req_prep(env, clerq);
2072 CERROR("cl_req_prep failed: %d\n", rc);
2076 sort_brw_pages(pga, page_count);
2077 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2078 pga, &req, crattr.cra_capa, 1, 0);
2080 CERROR("prep_req failed: %d\n", rc);
2084 req->rq_interpret_reply = brw_interpret;
2086 req->rq_memalloc = 1;
2088 /* Need to update the timestamps after the request is built in case
2089 * we race with setattr (locally or in queue at OST). If OST gets
2090 * later setattr before earlier BRW (as determined by the request xid),
2091 * the OST will not use BRW timestamps. Sadly, there is no obvious
2092 * way to do this in a single call. bug 10150 */
2093 cl_req_attr_set(env, clerq, &crattr,
2094 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2096 lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2098 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2099 aa = ptlrpc_req_async_args(req);
2100 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2101 cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2102 CFS_INIT_LIST_HEAD(&aa->aa_exts);
2103 cfs_list_splice_init(ext_list, &aa->aa_exts);
2104 aa->aa_clerq = clerq;
2106 /* queued sync pages can be torn down while the pages
2107 * were between the pending list and the rpc */
2109 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2110 /* only one oap gets a request reference */
2113 if (oap->oap_interrupted && !req->rq_intr) {
2114 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2116 ptlrpc_mark_interrupted(req);
2120 tmp->oap_request = ptlrpc_request_addref(req);
2122 client_obd_list_lock(&cli->cl_loi_list_lock);
2123 starting_offset >>= CFS_PAGE_SHIFT;
2124 if (cmd == OBD_BRW_READ) {
2125 cli->cl_r_in_flight++;
2126 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2127 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2128 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2129 starting_offset + 1);
2131 cli->cl_w_in_flight++;
2132 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2133 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2134 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2135 starting_offset + 1);
2137 client_obd_list_unlock(&cli->cl_loi_list_lock);
2139 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2140 page_count, aa, cli->cl_r_in_flight,
2141 cli->cl_w_in_flight);
2143 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2144 * see which CPU/NUMA node the majority of pages were allocated
2145 * on, and try to assign the async RPC to the CPU core
2146 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2148 * But on the other hand, we expect that multiple ptlrpcd
2149 * threads and the initial write sponsor can run in parallel,
2150 * especially when data checksum is enabled, which is CPU-bound
2151 * operation and single ptlrpcd thread cannot process in time.
2152 * So more ptlrpcd threads sharing BRW load
2153 * (with PDL_POLICY_ROUND) seems better.
2155 ptlrpcd_add_req(req, pol, -1);
2161 cfs_memory_pressure_restore(mpflag);
2163 capa_put(crattr.cra_capa);
2165 LASSERT(req == NULL);
2170 OBD_FREE(pga, sizeof(*pga) * page_count);
2171 /* this should happen rarely and is pretty bad, it makes the
2172 * pending list not follow the dirty order */
2173 while (!cfs_list_empty(ext_list)) {
2174 ext = cfs_list_entry(ext_list->next, struct osc_extent,
2176 cfs_list_del_init(&ext->oe_link);
2177 osc_extent_finish(env, ext, 0, rc);
2179 if (clerq && !IS_ERR(clerq))
2180 cl_req_completion(env, clerq, rc);
2185 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2186 struct ldlm_enqueue_info *einfo)
2188 void *data = einfo->ei_cbdata;
2191 LASSERT(lock != NULL);
2192 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2193 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2194 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2195 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2197 lock_res_and_lock(lock);
2198 cfs_spin_lock(&osc_ast_guard);
2200 if (lock->l_ast_data == NULL)
2201 lock->l_ast_data = data;
2202 if (lock->l_ast_data == data)
2205 cfs_spin_unlock(&osc_ast_guard);
2206 unlock_res_and_lock(lock);
2211 static int osc_set_data_with_check(struct lustre_handle *lockh,
2212 struct ldlm_enqueue_info *einfo)
2214 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2218 set = osc_set_lock_data_with_check(lock, einfo);
2219 LDLM_LOCK_PUT(lock);
2221 CERROR("lockh %p, data %p - client evicted?\n",
2222 lockh, einfo->ei_cbdata);
2226 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2227 ldlm_iterator_t replace, void *data)
2229 struct ldlm_res_id res_id;
2230 struct obd_device *obd = class_exp2obd(exp);
2232 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2233 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2237 /* find any ldlm lock of the inode in osc
2241 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2242 ldlm_iterator_t replace, void *data)
2244 struct ldlm_res_id res_id;
2245 struct obd_device *obd = class_exp2obd(exp);
2248 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2249 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2250 if (rc == LDLM_ITER_STOP)
2252 if (rc == LDLM_ITER_CONTINUE)
2257 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2258 obd_enqueue_update_f upcall, void *cookie,
2259 int *flags, int agl, int rc)
2261 int intent = *flags & LDLM_FL_HAS_INTENT;
2265 /* The request was created before ldlm_cli_enqueue call. */
2266 if (rc == ELDLM_LOCK_ABORTED) {
2267 struct ldlm_reply *rep;
2268 rep = req_capsule_server_get(&req->rq_pill,
2271 LASSERT(rep != NULL);
2272 if (rep->lock_policy_res1)
2273 rc = rep->lock_policy_res1;
2277 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2279 *flags |= LDLM_FL_LVB_READY;
2280 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2281 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2284 /* Call the update callback. */
2285 rc = (*upcall)(cookie, rc);
2289 static int osc_enqueue_interpret(const struct lu_env *env,
2290 struct ptlrpc_request *req,
2291 struct osc_enqueue_args *aa, int rc)
2293 struct ldlm_lock *lock;
2294 struct lustre_handle handle;
2296 struct ost_lvb *lvb;
2298 int *flags = aa->oa_flags;
2300 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2301 * might be freed anytime after lock upcall has been called. */
2302 lustre_handle_copy(&handle, aa->oa_lockh);
2303 mode = aa->oa_ei->ei_mode;
2305 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2307 lock = ldlm_handle2lock(&handle);
2309 /* Take an additional reference so that a blocking AST that
2310 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2311 * to arrive after an upcall has been executed by
2312 * osc_enqueue_fini(). */
2313 ldlm_lock_addref(&handle, mode);
2315 /* Let CP AST to grant the lock first. */
2316 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2318 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2323 lvb_len = sizeof(*aa->oa_lvb);
2326 /* Complete obtaining the lock procedure. */
2327 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2328 mode, flags, lvb, lvb_len, &handle, rc);
2329 /* Complete osc stuff. */
2330 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2331 flags, aa->oa_agl, rc);
2333 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2335 /* Release the lock for async request. */
2336 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2338 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2339 * not already released by
2340 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2342 ldlm_lock_decref(&handle, mode);
2344 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2345 aa->oa_lockh, req, aa);
2346 ldlm_lock_decref(&handle, mode);
2347 LDLM_LOCK_PUT(lock);
2351 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2352 struct lov_oinfo *loi, int flags,
2353 struct ost_lvb *lvb, __u32 mode, int rc)
2355 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2357 if (rc == ELDLM_OK) {
2360 LASSERT(lock != NULL);
2361 loi->loi_lvb = *lvb;
2362 tmp = loi->loi_lvb.lvb_size;
2363 /* Extend KMS up to the end of this lock and no further
2364 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2365 if (tmp > lock->l_policy_data.l_extent.end)
2366 tmp = lock->l_policy_data.l_extent.end + 1;
2367 if (tmp >= loi->loi_kms) {
2368 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2369 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2370 loi_kms_set(loi, tmp);
2372 LDLM_DEBUG(lock, "lock acquired, setting rss="
2373 LPU64"; leaving kms="LPU64", end="LPU64,
2374 loi->loi_lvb.lvb_size, loi->loi_kms,
2375 lock->l_policy_data.l_extent.end);
2377 ldlm_lock_allow_match(lock);
2378 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2379 LASSERT(lock != NULL);
2380 loi->loi_lvb = *lvb;
2381 ldlm_lock_allow_match(lock);
2382 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2383 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2389 ldlm_lock_fail_match(lock);
2391 LDLM_LOCK_PUT(lock);
2394 EXPORT_SYMBOL(osc_update_enqueue);
2396 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2398 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2399 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2400 * other synchronous requests, however keeping some locks and trying to obtain
2401 * others may take a considerable amount of time in a case of ost failure; and
2402 * when other sync requests do not get released lock from a client, the client
2403 * is excluded from the cluster -- such scenarious make the life difficult, so
2404 * release locks just after they are obtained. */
2405 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2406 int *flags, ldlm_policy_data_t *policy,
2407 struct ost_lvb *lvb, int kms_valid,
2408 obd_enqueue_update_f upcall, void *cookie,
2409 struct ldlm_enqueue_info *einfo,
2410 struct lustre_handle *lockh,
2411 struct ptlrpc_request_set *rqset, int async, int agl)
2413 struct obd_device *obd = exp->exp_obd;
2414 struct ptlrpc_request *req = NULL;
2415 int intent = *flags & LDLM_FL_HAS_INTENT;
2416 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2421 /* Filesystem lock extents are extended to page boundaries so that
2422 * dealing with the page cache is a little smoother. */
2423 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2424 policy->l_extent.end |= ~CFS_PAGE_MASK;
2427 * kms is not valid when either object is completely fresh (so that no
2428 * locks are cached), or object was evicted. In the latter case cached
2429 * lock cannot be used, because it would prime inode state with
2430 * potentially stale LVB.
2435 /* Next, search for already existing extent locks that will cover us */
2436 /* If we're trying to read, we also search for an existing PW lock. The
2437 * VFS and page cache already protect us locally, so lots of readers/
2438 * writers can share a single PW lock.
2440 * There are problems with conversion deadlocks, so instead of
2441 * converting a read lock to a write lock, we'll just enqueue a new
2444 * At some point we should cancel the read lock instead of making them
2445 * send us a blocking callback, but there are problems with canceling
2446 * locks out from other users right now, too. */
2447 mode = einfo->ei_mode;
2448 if (einfo->ei_mode == LCK_PR)
2450 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2451 einfo->ei_type, policy, mode, lockh, 0);
2453 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2455 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2456 /* For AGL, if enqueue RPC is sent but the lock is not
2457 * granted, then skip to process this strpe.
2458 * Return -ECANCELED to tell the caller. */
2459 ldlm_lock_decref(lockh, mode);
2460 LDLM_LOCK_PUT(matched);
2462 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2463 *flags |= LDLM_FL_LVB_READY;
2464 /* addref the lock only if not async requests and PW
2465 * lock is matched whereas we asked for PR. */
2466 if (!rqset && einfo->ei_mode != mode)
2467 ldlm_lock_addref(lockh, LCK_PR);
2469 /* I would like to be able to ASSERT here that
2470 * rss <= kms, but I can't, for reasons which
2471 * are explained in lov_enqueue() */
2474 /* We already have a lock, and it's referenced */
2475 (*upcall)(cookie, ELDLM_OK);
2477 if (einfo->ei_mode != mode)
2478 ldlm_lock_decref(lockh, LCK_PW);
2480 /* For async requests, decref the lock. */
2481 ldlm_lock_decref(lockh, einfo->ei_mode);
2482 LDLM_LOCK_PUT(matched);
2485 ldlm_lock_decref(lockh, mode);
2486 LDLM_LOCK_PUT(matched);
2492 CFS_LIST_HEAD(cancels);
2493 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2494 &RQF_LDLM_ENQUEUE_LVB);
2498 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2500 ptlrpc_request_free(req);
2504 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2506 ptlrpc_request_set_replen(req);
2509 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2510 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2512 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2513 sizeof(*lvb), lockh, async);
2516 struct osc_enqueue_args *aa;
2517 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2518 aa = ptlrpc_req_async_args(req);
2521 aa->oa_flags = flags;
2522 aa->oa_upcall = upcall;
2523 aa->oa_cookie = cookie;
2525 aa->oa_lockh = lockh;
2528 req->rq_interpret_reply =
2529 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2530 if (rqset == PTLRPCD_SET)
2531 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2533 ptlrpc_set_add_req(rqset, req);
2534 } else if (intent) {
2535 ptlrpc_req_finished(req);
2540 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2542 ptlrpc_req_finished(req);
2547 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2548 struct ldlm_enqueue_info *einfo,
2549 struct ptlrpc_request_set *rqset)
2551 struct ldlm_res_id res_id;
2555 osc_build_res_name(oinfo->oi_md->lsm_object_id,
2556 oinfo->oi_md->lsm_object_seq, &res_id);
2558 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2559 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2560 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2561 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2562 rqset, rqset != NULL, 0);
2566 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2567 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2568 int *flags, void *data, struct lustre_handle *lockh,
2571 struct obd_device *obd = exp->exp_obd;
2572 int lflags = *flags;
2576 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2579 /* Filesystem lock extents are extended to page boundaries so that
2580 * dealing with the page cache is a little smoother */
2581 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2582 policy->l_extent.end |= ~CFS_PAGE_MASK;
2584 /* Next, search for already existing extent locks that will cover us */
2585 /* If we're trying to read, we also search for an existing PW lock. The
2586 * VFS and page cache already protect us locally, so lots of readers/
2587 * writers can share a single PW lock. */
2591 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2592 res_id, type, policy, rc, lockh, unref);
2595 if (!osc_set_data_with_check(lockh, data)) {
2596 if (!(lflags & LDLM_FL_TEST_LOCK))
2597 ldlm_lock_decref(lockh, rc);
2601 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2602 ldlm_lock_addref(lockh, LCK_PR);
2603 ldlm_lock_decref(lockh, LCK_PW);
2610 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2614 if (unlikely(mode == LCK_GROUP))
2615 ldlm_lock_decref_and_cancel(lockh, mode);
2617 ldlm_lock_decref(lockh, mode);
2622 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2623 __u32 mode, struct lustre_handle *lockh)
2626 RETURN(osc_cancel_base(lockh, mode));
2629 static int osc_cancel_unused(struct obd_export *exp,
2630 struct lov_stripe_md *lsm,
2631 ldlm_cancel_flags_t flags,
2634 struct obd_device *obd = class_exp2obd(exp);
2635 struct ldlm_res_id res_id, *resp = NULL;
2638 resp = osc_build_res_name(lsm->lsm_object_id,
2639 lsm->lsm_object_seq, &res_id);
2642 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2645 static int osc_statfs_interpret(const struct lu_env *env,
2646 struct ptlrpc_request *req,
2647 struct osc_async_args *aa, int rc)
2649 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2650 struct obd_statfs *msfs;
2655 /* The request has in fact never been sent
2656 * due to issues at a higher level (LOV).
2657 * Exit immediately since the caller is
2658 * aware of the problem and takes care
2659 * of the clean up */
2662 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2663 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2669 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2671 GOTO(out, rc = -EPROTO);
2674 /* Reinitialize the RDONLY and DEGRADED flags at the client
2675 * on each statfs, so they don't stay set permanently. */
2676 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2678 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2679 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2680 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2681 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2683 if (unlikely(msfs->os_state & OS_STATE_READONLY))
2684 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2685 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2686 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2688 /* Add a bit of hysteresis so this flag isn't continually flapping,
2689 * and ensure that new files don't get extremely fragmented due to
2690 * only a small amount of available space in the filesystem.
2691 * We want to set the NOSPC flag when there is less than ~0.1% free
2692 * and clear it when there is at least ~0.2% free space, so:
2693 * avail < ~0.1% max max = avail + used
2694 * 1025 * avail < avail + used used = blocks - free
2695 * 1024 * avail < used
2696 * 1024 * avail < blocks - free
2697 * avail < ((blocks - free) >> 10)
2699 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2700 * lose that amount of space so in those cases we report no space left
2701 * if their is less than 1 GB left. */
2702 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2703 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2704 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2705 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2706 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2707 (msfs->os_ffree > 64) &&
2708 (msfs->os_bavail > (used << 1)))) {
2709 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2710 OSCC_FLAG_NOSPC_BLK);
2713 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2714 (msfs->os_bavail < used)))
2715 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2717 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2719 *aa->aa_oi->oi_osfs = *msfs;
2721 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2725 static int osc_statfs_async(struct obd_export *exp,
2726 struct obd_info *oinfo, __u64 max_age,
2727 struct ptlrpc_request_set *rqset)
2729 struct obd_device *obd = class_exp2obd(exp);
2730 struct ptlrpc_request *req;
2731 struct osc_async_args *aa;
2735 /* We could possibly pass max_age in the request (as an absolute
2736 * timestamp or a "seconds.usec ago") so the target can avoid doing
2737 * extra calls into the filesystem if that isn't necessary (e.g.
2738 * during mount that would help a bit). Having relative timestamps
2739 * is not so great if request processing is slow, while absolute
2740 * timestamps are not ideal because they need time synchronization. */
2741 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2745 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2747 ptlrpc_request_free(req);
2750 ptlrpc_request_set_replen(req);
2751 req->rq_request_portal = OST_CREATE_PORTAL;
2752 ptlrpc_at_set_req_timeout(req);
2754 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2755 /* procfs requests not want stat in wait for avoid deadlock */
2756 req->rq_no_resend = 1;
2757 req->rq_no_delay = 1;
2760 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2761 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2762 aa = ptlrpc_req_async_args(req);
2765 ptlrpc_set_add_req(rqset, req);
2769 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2770 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2772 struct obd_device *obd = class_exp2obd(exp);
2773 struct obd_statfs *msfs;
2774 struct ptlrpc_request *req;
2775 struct obd_import *imp = NULL;
2779 /*Since the request might also come from lprocfs, so we need
2780 *sync this with client_disconnect_export Bug15684*/
2781 cfs_down_read(&obd->u.cli.cl_sem);
2782 if (obd->u.cli.cl_import)
2783 imp = class_import_get(obd->u.cli.cl_import);
2784 cfs_up_read(&obd->u.cli.cl_sem);
2788 /* We could possibly pass max_age in the request (as an absolute
2789 * timestamp or a "seconds.usec ago") so the target can avoid doing
2790 * extra calls into the filesystem if that isn't necessary (e.g.
2791 * during mount that would help a bit). Having relative timestamps
2792 * is not so great if request processing is slow, while absolute
2793 * timestamps are not ideal because they need time synchronization. */
2794 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2796 class_import_put(imp);
2801 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2803 ptlrpc_request_free(req);
2806 ptlrpc_request_set_replen(req);
2807 req->rq_request_portal = OST_CREATE_PORTAL;
2808 ptlrpc_at_set_req_timeout(req);
2810 if (flags & OBD_STATFS_NODELAY) {
2811 /* procfs requests not want stat in wait for avoid deadlock */
2812 req->rq_no_resend = 1;
2813 req->rq_no_delay = 1;
2816 rc = ptlrpc_queue_wait(req);
2820 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2822 GOTO(out, rc = -EPROTO);
2829 ptlrpc_req_finished(req);
2833 /* Retrieve object striping information.
2835 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2836 * the maximum number of OST indices which will fit in the user buffer.
2837 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2839 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2841 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2842 struct lov_user_md_v3 lum, *lumk;
2843 struct lov_user_ost_data_v1 *lmm_objects;
2844 int rc = 0, lum_size;
2850 /* we only need the header part from user space to get lmm_magic and
2851 * lmm_stripe_count, (the header part is common to v1 and v3) */
2852 lum_size = sizeof(struct lov_user_md_v1);
2853 if (cfs_copy_from_user(&lum, lump, lum_size))
2856 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2857 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2860 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2861 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2862 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2863 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2865 /* we can use lov_mds_md_size() to compute lum_size
2866 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2867 if (lum.lmm_stripe_count > 0) {
2868 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2869 OBD_ALLOC(lumk, lum_size);
2873 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2874 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2876 lmm_objects = &(lumk->lmm_objects[0]);
2877 lmm_objects->l_object_id = lsm->lsm_object_id;
2879 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2883 lumk->lmm_object_id = lsm->lsm_object_id;
2884 lumk->lmm_object_seq = lsm->lsm_object_seq;
2885 lumk->lmm_stripe_count = 1;
2887 if (cfs_copy_to_user(lump, lumk, lum_size))
2891 OBD_FREE(lumk, lum_size);
2897 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2898 void *karg, void *uarg)
2900 struct obd_device *obd = exp->exp_obd;
2901 struct obd_ioctl_data *data = karg;
2905 if (!cfs_try_module_get(THIS_MODULE)) {
2906 CERROR("Can't get module. Is it alive?");
2910 case OBD_IOC_LOV_GET_CONFIG: {
2912 struct lov_desc *desc;
2913 struct obd_uuid uuid;
2917 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2918 GOTO(out, err = -EINVAL);
2920 data = (struct obd_ioctl_data *)buf;
2922 if (sizeof(*desc) > data->ioc_inllen1) {
2923 obd_ioctl_freedata(buf, len);
2924 GOTO(out, err = -EINVAL);
2927 if (data->ioc_inllen2 < sizeof(uuid)) {
2928 obd_ioctl_freedata(buf, len);
2929 GOTO(out, err = -EINVAL);
2932 desc = (struct lov_desc *)data->ioc_inlbuf1;
2933 desc->ld_tgt_count = 1;
2934 desc->ld_active_tgt_count = 1;
2935 desc->ld_default_stripe_count = 1;
2936 desc->ld_default_stripe_size = 0;
2937 desc->ld_default_stripe_offset = 0;
2938 desc->ld_pattern = 0;
2939 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2941 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2943 err = cfs_copy_to_user((void *)uarg, buf, len);
2946 obd_ioctl_freedata(buf, len);
2949 case LL_IOC_LOV_SETSTRIPE:
2950 err = obd_alloc_memmd(exp, karg);
2954 case LL_IOC_LOV_GETSTRIPE:
2955 err = osc_getstripe(karg, uarg);
2957 case OBD_IOC_CLIENT_RECOVER:
2958 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2959 data->ioc_inlbuf1, 0);
2963 case IOC_OSC_SET_ACTIVE:
2964 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2967 case OBD_IOC_POLL_QUOTACHECK:
2968 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2970 case OBD_IOC_PING_TARGET:
2971 err = ptlrpc_obd_ping(obd);
2974 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2975 cmd, cfs_curproc_comm());
2976 GOTO(out, err = -ENOTTY);
2979 cfs_module_put(THIS_MODULE);
2983 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2984 obd_count keylen, void *key, __u32 *vallen, void *val,
2985 struct lov_stripe_md *lsm)
2988 if (!vallen || !val)
2991 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2992 __u32 *stripe = val;
2993 *vallen = sizeof(*stripe);
2996 } else if (KEY_IS(KEY_LAST_ID)) {
2997 struct ptlrpc_request *req;
3002 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3003 &RQF_OST_GET_INFO_LAST_ID);
3007 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3008 RCL_CLIENT, keylen);
3009 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3011 ptlrpc_request_free(req);
3015 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3016 memcpy(tmp, key, keylen);
3018 req->rq_no_delay = req->rq_no_resend = 1;
3019 ptlrpc_request_set_replen(req);
3020 rc = ptlrpc_queue_wait(req);
3024 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3026 GOTO(out, rc = -EPROTO);
3028 *((obd_id *)val) = *reply;
3030 ptlrpc_req_finished(req);
3032 } else if (KEY_IS(KEY_FIEMAP)) {
3033 struct ptlrpc_request *req;
3034 struct ll_user_fiemap *reply;
3038 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3039 &RQF_OST_GET_INFO_FIEMAP);
3043 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3044 RCL_CLIENT, keylen);
3045 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3046 RCL_CLIENT, *vallen);
3047 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3048 RCL_SERVER, *vallen);
3050 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3052 ptlrpc_request_free(req);
3056 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3057 memcpy(tmp, key, keylen);
3058 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3059 memcpy(tmp, val, *vallen);
3061 ptlrpc_request_set_replen(req);
3062 rc = ptlrpc_queue_wait(req);
3066 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3068 GOTO(out1, rc = -EPROTO);
3070 memcpy(val, reply, *vallen);
3072 ptlrpc_req_finished(req);
3080 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3082 struct llog_ctxt *ctxt;
3086 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3088 rc = llog_initiator_connect(ctxt);
3089 llog_ctxt_put(ctxt);
3091 /* XXX return an error? skip setting below flags? */
3094 cfs_spin_lock(&imp->imp_lock);
3095 imp->imp_server_timeout = 1;
3096 imp->imp_pingable = 1;
3097 cfs_spin_unlock(&imp->imp_lock);
3098 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3103 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3104 struct ptlrpc_request *req,
3111 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3114 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3115 obd_count keylen, void *key, obd_count vallen,
3116 void *val, struct ptlrpc_request_set *set)
3118 struct ptlrpc_request *req;
3119 struct obd_device *obd = exp->exp_obd;
3120 struct obd_import *imp = class_exp2cliimp(exp);
3125 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3127 if (KEY_IS(KEY_NEXT_ID)) {
3129 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3131 if (vallen != sizeof(obd_id))
3136 if (vallen != sizeof(obd_id))
3139 /* avoid race between allocate new object and set next id
3140 * from ll_sync thread */
3141 cfs_spin_lock(&oscc->oscc_lock);
3142 new_val = *((obd_id*)val) + 1;
3143 if (new_val > oscc->oscc_next_id)
3144 oscc->oscc_next_id = new_val;
3145 cfs_spin_unlock(&oscc->oscc_lock);
3146 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3147 exp->exp_obd->obd_name,
3148 obd->u.cli.cl_oscc.oscc_next_id);
3153 if (KEY_IS(KEY_CHECKSUM)) {
3154 if (vallen != sizeof(int))
3156 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3160 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3161 sptlrpc_conf_client_adapt(obd);
3165 if (KEY_IS(KEY_FLUSH_CTX)) {
3166 sptlrpc_import_flush_my_ctx(imp);
3170 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3173 /* We pass all other commands directly to OST. Since nobody calls osc
3174 methods directly and everybody is supposed to go through LOV, we
3175 assume lov checked invalid values for us.
3176 The only recognised values so far are evict_by_nid and mds_conn.
3177 Even if something bad goes through, we'd get a -EINVAL from OST
3180 if (KEY_IS(KEY_GRANT_SHRINK))
3181 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3183 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3188 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3189 RCL_CLIENT, keylen);
3190 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3191 RCL_CLIENT, vallen);
3192 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3194 ptlrpc_request_free(req);
3198 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3199 memcpy(tmp, key, keylen);
3200 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3201 memcpy(tmp, val, vallen);
3203 if (KEY_IS(KEY_MDS_CONN)) {
3204 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3206 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3207 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3208 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3209 req->rq_no_delay = req->rq_no_resend = 1;
3210 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3211 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3212 struct osc_grant_args *aa;
3215 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3216 aa = ptlrpc_req_async_args(req);
3219 ptlrpc_req_finished(req);
3222 *oa = ((struct ost_body *)val)->oa;
3224 req->rq_interpret_reply = osc_shrink_grant_interpret;
3227 ptlrpc_request_set_replen(req);
3228 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3229 LASSERT(set != NULL);
3230 ptlrpc_set_add_req(set, req);
3231 ptlrpc_check_set(NULL, set);
3233 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3239 static struct llog_operations osc_size_repl_logops = {
3240 lop_cancel: llog_obd_repl_cancel
3243 static struct llog_operations osc_mds_ost_orig_logops;
3245 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3246 struct obd_device *tgt, struct llog_catid *catid)
3251 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3252 &catid->lci_logid, &osc_mds_ost_orig_logops);
3254 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3258 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3259 NULL, &osc_size_repl_logops);
3261 struct llog_ctxt *ctxt =
3262 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3265 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3270 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
3271 obd->obd_name, tgt->obd_name, catid, rc);
3272 CERROR("logid "LPX64":0x%x\n",
3273 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3278 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3279 struct obd_device *disk_obd, int *index)
3281 struct llog_catid catid;
3282 static char name[32] = CATLIST;
3286 LASSERT(olg == &obd->obd_olg);
3288 cfs_mutex_lock(&olg->olg_cat_processing);
3289 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
3291 CERROR("rc: %d\n", rc);
3295 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
3296 obd->obd_name, *index, catid.lci_logid.lgl_oid,
3297 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
3299 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
3301 CERROR("rc: %d\n", rc);
3305 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
3307 CERROR("rc: %d\n", rc);
3312 cfs_mutex_unlock(&olg->olg_cat_processing);
3317 static int osc_llog_finish(struct obd_device *obd, int count)
3319 struct llog_ctxt *ctxt;
3320 int rc = 0, rc2 = 0;
3323 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3325 rc = llog_cleanup(ctxt);
3327 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3329 rc2 = llog_cleanup(ctxt);
3336 static int osc_reconnect(const struct lu_env *env,
3337 struct obd_export *exp, struct obd_device *obd,
3338 struct obd_uuid *cluuid,
3339 struct obd_connect_data *data,
3342 struct client_obd *cli = &obd->u.cli;
3344 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3347 client_obd_list_lock(&cli->cl_loi_list_lock);
3348 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3349 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3350 lost_grant = cli->cl_lost_grant;
3351 cli->cl_lost_grant = 0;
3352 client_obd_list_unlock(&cli->cl_loi_list_lock);
3354 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3355 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3356 data->ocd_version, data->ocd_grant, lost_grant);
3362 static int osc_disconnect(struct obd_export *exp)
3364 struct obd_device *obd = class_exp2obd(exp);
3365 struct llog_ctxt *ctxt;
3368 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3370 if (obd->u.cli.cl_conn_count == 1) {
3371 /* Flush any remaining cancel messages out to the
3373 llog_sync(ctxt, exp, 0);
3375 llog_ctxt_put(ctxt);
3377 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3381 rc = client_disconnect_export(exp);
3383 * Initially we put del_shrink_grant before disconnect_export, but it
3384 * causes the following problem if setup (connect) and cleanup
3385 * (disconnect) are tangled together.
3386 * connect p1 disconnect p2
3387 * ptlrpc_connect_import
3388 * ............... class_manual_cleanup
3391 * ptlrpc_connect_interrupt
3393 * add this client to shrink list
3395 * Bang! pinger trigger the shrink.
3396 * So the osc should be disconnected from the shrink list, after we
3397 * are sure the import has been destroyed. BUG18662
3399 if (obd->u.cli.cl_import == NULL)
3400 osc_del_shrink_grant(&obd->u.cli);
3404 static int osc_import_event(struct obd_device *obd,
3405 struct obd_import *imp,
3406 enum obd_import_event event)
3408 struct client_obd *cli;
3412 LASSERT(imp->imp_obd == obd);
3415 case IMP_EVENT_DISCON: {
3416 /* Only do this on the MDS OSC's */
3417 if (imp->imp_server_timeout) {
3418 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3420 cfs_spin_lock(&oscc->oscc_lock);
3421 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3422 cfs_spin_unlock(&oscc->oscc_lock);
3425 client_obd_list_lock(&cli->cl_loi_list_lock);
3426 cli->cl_avail_grant = 0;
3427 cli->cl_lost_grant = 0;
3428 client_obd_list_unlock(&cli->cl_loi_list_lock);
3431 case IMP_EVENT_INACTIVE: {
3432 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3435 case IMP_EVENT_INVALIDATE: {
3436 struct ldlm_namespace *ns = obd->obd_namespace;
3440 env = cl_env_get(&refcheck);
3444 /* all pages go to failing rpcs due to the invalid
3446 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3448 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3449 cl_env_put(env, &refcheck);
3454 case IMP_EVENT_ACTIVE: {
3455 /* Only do this on the MDS OSC's */
3456 if (imp->imp_server_timeout) {
3457 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3459 cfs_spin_lock(&oscc->oscc_lock);
3460 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
3461 OSCC_FLAG_NOSPC_BLK);
3462 cfs_spin_unlock(&oscc->oscc_lock);
3464 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3467 case IMP_EVENT_OCD: {
3468 struct obd_connect_data *ocd = &imp->imp_connect_data;
3470 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3471 osc_init_grant(&obd->u.cli, ocd);
3474 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3475 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3477 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3480 case IMP_EVENT_DEACTIVATE: {
3481 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3484 case IMP_EVENT_ACTIVATE: {
3485 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3489 CERROR("Unknown import event %d\n", event);
3496 * Determine whether the lock can be canceled before replaying the lock
3497 * during recovery, see bug16774 for detailed information.
3499 * \retval zero the lock can't be canceled
3500 * \retval other ok to cancel
3502 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3504 check_res_locked(lock->l_resource);
3507 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3509 * XXX as a future improvement, we can also cancel unused write lock
3510 * if it doesn't have dirty data and active mmaps.
3512 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3513 (lock->l_granted_mode == LCK_PR ||
3514 lock->l_granted_mode == LCK_CR) &&
3515 (osc_dlm_lock_pageref(lock) == 0))
3521 static int brw_queue_work(const struct lu_env *env, void *data)
3523 struct client_obd *cli = data;
3525 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3527 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3531 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3533 struct client_obd *cli = &obd->u.cli;
3538 rc = ptlrpcd_addref();
3542 rc = client_obd_setup(obd, lcfg);
3545 handler = ptlrpcd_alloc_work(cli->cl_import,
3546 brw_queue_work, cli);
3547 if (!IS_ERR(handler))
3548 cli->cl_writeback_work = handler;
3550 rc = PTR_ERR(handler);
3554 struct lprocfs_static_vars lvars = { 0 };
3556 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3557 lprocfs_osc_init_vars(&lvars);
3558 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3559 lproc_osc_attach_seqstat(obd);
3560 sptlrpc_lprocfs_cliobd_attach(obd);
3561 ptlrpc_lprocfs_register_obd(obd);
3565 /* We need to allocate a few requests more, because
3566 brw_interpret tries to create new requests before freeing
3567 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3568 reserved, but I afraid that might be too much wasted RAM
3569 in fact, so 2 is just my guess and still should work. */
3570 cli->cl_import->imp_rq_pool =
3571 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3573 ptlrpc_add_rqs_to_pool);
3575 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3577 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3585 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3591 case OBD_CLEANUP_EARLY: {
3592 struct obd_import *imp;
3593 imp = obd->u.cli.cl_import;
3594 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3595 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3596 ptlrpc_deactivate_import(imp);
3597 cfs_spin_lock(&imp->imp_lock);
3598 imp->imp_pingable = 0;
3599 cfs_spin_unlock(&imp->imp_lock);
3602 case OBD_CLEANUP_EXPORTS: {
3603 struct client_obd *cli = &obd->u.cli;
3605 * for echo client, export may be on zombie list, wait for
3606 * zombie thread to cull it, because cli.cl_import will be
3607 * cleared in client_disconnect_export():
3608 * class_export_destroy() -> obd_cleanup() ->
3609 * echo_device_free() -> echo_client_cleanup() ->
3610 * obd_disconnect() -> osc_disconnect() ->
3611 * client_disconnect_export()
3613 obd_zombie_barrier();
3614 if (cli->cl_writeback_work) {
3615 ptlrpcd_destroy_work(cli->cl_writeback_work);
3616 cli->cl_writeback_work = NULL;
3618 obd_cleanup_client_import(obd);
3619 ptlrpc_lprocfs_unregister_obd(obd);
3620 lprocfs_obd_cleanup(obd);
3621 rc = obd_llog_finish(obd, 0);
3623 CERROR("failed to cleanup llogging subsystems\n");
3630 int osc_cleanup(struct obd_device *obd)
3636 /* free memory of osc quota cache */
3637 osc_quota_cleanup(obd);
3639 rc = client_obd_cleanup(obd);
3645 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3647 struct lprocfs_static_vars lvars = { 0 };
3650 lprocfs_osc_init_vars(&lvars);
3652 switch (lcfg->lcfg_command) {
3654 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3664 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3666 return osc_process_config_base(obd, buf);
3669 struct obd_ops osc_obd_ops = {
3670 .o_owner = THIS_MODULE,
3671 .o_setup = osc_setup,
3672 .o_precleanup = osc_precleanup,
3673 .o_cleanup = osc_cleanup,
3674 .o_add_conn = client_import_add_conn,
3675 .o_del_conn = client_import_del_conn,
3676 .o_connect = client_connect_import,
3677 .o_reconnect = osc_reconnect,
3678 .o_disconnect = osc_disconnect,
3679 .o_statfs = osc_statfs,
3680 .o_statfs_async = osc_statfs_async,
3681 .o_packmd = osc_packmd,
3682 .o_unpackmd = osc_unpackmd,
3683 .o_precreate = osc_precreate,
3684 .o_create = osc_create,
3685 .o_create_async = osc_create_async,
3686 .o_destroy = osc_destroy,
3687 .o_getattr = osc_getattr,
3688 .o_getattr_async = osc_getattr_async,
3689 .o_setattr = osc_setattr,
3690 .o_setattr_async = osc_setattr_async,
3692 .o_punch = osc_punch,
3694 .o_enqueue = osc_enqueue,
3695 .o_change_cbdata = osc_change_cbdata,
3696 .o_find_cbdata = osc_find_cbdata,
3697 .o_cancel = osc_cancel,
3698 .o_cancel_unused = osc_cancel_unused,
3699 .o_iocontrol = osc_iocontrol,
3700 .o_get_info = osc_get_info,
3701 .o_set_info_async = osc_set_info_async,
3702 .o_import_event = osc_import_event,
3703 .o_llog_init = osc_llog_init,
3704 .o_llog_finish = osc_llog_finish,
3705 .o_process_config = osc_process_config,
3706 .o_quotactl = osc_quotactl,
3707 .o_quotacheck = osc_quotacheck,
3708 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
3711 extern struct lu_kmem_descr osc_caches[];
3712 extern cfs_spinlock_t osc_ast_guard;
3713 extern cfs_lock_class_key_t osc_ast_guard_class;
3715 int __init osc_init(void)
3717 struct lprocfs_static_vars lvars = { 0 };
3721 /* print an address of _any_ initialized kernel symbol from this
3722 * module, to allow debugging with gdb that doesn't support data
3723 * symbols from modules.*/
3724 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3726 rc = lu_kmem_init(osc_caches);
3728 lprocfs_osc_init_vars(&lvars);
3731 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3732 LUSTRE_OSC_NAME, &osc_device_type);
3734 lu_kmem_fini(osc_caches);
3738 cfs_spin_lock_init(&osc_ast_guard);
3739 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3741 osc_mds_ost_orig_logops = llog_lvfs_ops;
3742 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3743 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3744 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3745 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3751 static void /*__exit*/ osc_exit(void)
3754 class_unregister_type(LUSTRE_OSC_NAME);
3755 lu_kmem_fini(osc_caches);
3758 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3759 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3760 MODULE_LICENSE("GPL");
3762 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);