1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
41 # define EXPORT_SYMTAB
43 #define DEBUG_SUBSYSTEM S_OSC
45 #include <libcfs/libcfs.h>
48 # include <liblustre.h>
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
74 int osc_cleanup(struct obd_device *obd);
76 /* Pack OSC object metadata for disk storage (LE byte order). */
77 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
78 struct lov_stripe_md *lsm)
83 lmm_size = sizeof(**lmmp);
88 OBD_FREE(*lmmp, lmm_size);
94 OBD_ALLOC(*lmmp, lmm_size);
100 LASSERT(lsm->lsm_object_id);
101 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
102 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
103 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
109 /* Unpack OSC object metadata from disk storage (LE byte order). */
110 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
111 struct lov_mds_md *lmm, int lmm_bytes)
114 struct obd_import *imp = class_exp2cliimp(exp);
118 if (lmm_bytes < sizeof (*lmm)) {
119 CERROR("lov_mds_md too small: %d, need %d\n",
120 lmm_bytes, (int)sizeof(*lmm));
123 /* XXX LOV_MAGIC etc check? */
125 if (lmm->lmm_object_id == 0) {
126 CERROR("lov_mds_md: zero lmm_object_id\n");
131 lsm_size = lov_stripe_md_size(1);
135 if (*lsmp != NULL && lmm == NULL) {
136 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137 OBD_FREE(*lsmp, lsm_size);
143 OBD_ALLOC(*lsmp, lsm_size);
146 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
147 if ((*lsmp)->lsm_oinfo[0] == NULL) {
148 OBD_FREE(*lsmp, lsm_size);
151 loi_init((*lsmp)->lsm_oinfo[0]);
155 /* XXX zero *lsmp? */
156 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
157 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
158 LASSERT((*lsmp)->lsm_object_id);
159 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
163 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
164 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
166 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
171 static inline void osc_pack_capa(struct ptlrpc_request *req,
172 struct ost_body *body, void *capa)
174 struct obd_capa *oc = (struct obd_capa *)capa;
175 struct lustre_capa *c;
180 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
183 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
184 DEBUG_CAPA(D_SEC, c, "pack");
187 static inline void osc_pack_req_body(struct ptlrpc_request *req,
188 struct obd_info *oinfo)
190 struct ost_body *body;
192 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
195 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
196 osc_pack_capa(req, body, oinfo->oi_capa);
199 static inline void osc_set_capa_size(struct ptlrpc_request *req,
200 const struct req_msg_field *field,
204 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
206 /* it is already calculated as sizeof struct obd_capa */
210 static int osc_getattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_async_args *aa, int rc)
214 struct ost_body *body;
220 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
223 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
225 /* This should really be sent by the OST */
226 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
227 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
229 CDEBUG(D_INFO, "can't unpack ost_body\n");
231 aa->aa_oi->oi_oa->o_valid = 0;
234 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
238 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
239 struct ptlrpc_request_set *set)
241 struct ptlrpc_request *req;
242 struct osc_async_args *aa;
246 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
250 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
251 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253 ptlrpc_request_free(req);
257 osc_pack_req_body(req, oinfo);
259 ptlrpc_request_set_replen(req);
260 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
263 aa = ptlrpc_req_async_args(req);
266 ptlrpc_set_add_req(set, req);
270 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
272 struct ptlrpc_request *req;
273 struct ost_body *body;
277 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
281 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
282 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284 ptlrpc_request_free(req);
288 osc_pack_req_body(req, oinfo);
290 ptlrpc_request_set_replen(req);
292 rc = ptlrpc_queue_wait(req);
296 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
298 GOTO(out, rc = -EPROTO);
300 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
301 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
303 /* This should really be sent by the OST */
304 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
305 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
309 ptlrpc_req_finished(req);
313 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
314 struct obd_trans_info *oti)
316 struct ptlrpc_request *req;
317 struct ost_body *body;
321 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
323 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
327 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
328 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
330 ptlrpc_request_free(req);
334 osc_pack_req_body(req, oinfo);
336 ptlrpc_request_set_replen(req);
338 rc = ptlrpc_queue_wait(req);
342 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344 GOTO(out, rc = -EPROTO);
346 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
350 ptlrpc_req_finished(req);
354 static int osc_setattr_interpret(const struct lu_env *env,
355 struct ptlrpc_request *req,
356 struct osc_setattr_args *sa, int rc)
358 struct ost_body *body;
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366 GOTO(out, rc = -EPROTO);
368 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
370 rc = sa->sa_upcall(sa->sa_cookie, rc);
374 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
375 struct obd_trans_info *oti,
376 obd_enqueue_update_f upcall, void *cookie,
377 struct ptlrpc_request_set *rqset)
379 struct ptlrpc_request *req;
380 struct osc_setattr_args *sa;
384 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
388 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
389 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391 ptlrpc_request_free(req);
395 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
396 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398 osc_pack_req_body(req, oinfo);
400 ptlrpc_request_set_replen(req);
402 /* do mds to ost setattr asynchronously */
404 /* Do not wait for response. */
405 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
407 req->rq_interpret_reply =
408 (ptlrpc_interpterer_t)osc_setattr_interpret;
410 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
411 sa = ptlrpc_req_async_args(req);
412 sa->sa_oa = oinfo->oi_oa;
413 sa->sa_upcall = upcall;
414 sa->sa_cookie = cookie;
416 if (rqset == PTLRPCD_SET)
417 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
419 ptlrpc_set_add_req(rqset, req);
425 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
426 struct obd_trans_info *oti,
427 struct ptlrpc_request_set *rqset)
429 return osc_setattr_async_base(exp, oinfo, oti,
430 oinfo->oi_cb_up, oinfo, rqset);
433 int osc_real_create(struct obd_export *exp, struct obdo *oa,
434 struct lov_stripe_md **ea, struct obd_trans_info *oti)
436 struct ptlrpc_request *req;
437 struct ost_body *body;
438 struct lov_stripe_md *lsm;
447 rc = obd_alloc_memmd(exp, &lsm);
452 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
454 GOTO(out, rc = -ENOMEM);
456 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
458 ptlrpc_request_free(req);
462 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464 lustre_set_wire_obdo(&body->oa, oa);
466 ptlrpc_request_set_replen(req);
468 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469 oa->o_flags == OBD_FL_DELORPHAN) {
471 "delorphan from OST integration");
472 /* Don't resend the delorphan req */
473 req->rq_no_resend = req->rq_no_delay = 1;
476 rc = ptlrpc_queue_wait(req);
480 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
482 GOTO(out_req, rc = -EPROTO);
484 lustre_get_wire_obdo(oa, &body->oa);
486 /* This should really be sent by the OST */
487 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
488 oa->o_valid |= OBD_MD_FLBLKSZ;
490 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491 * have valid lsm_oinfo data structs, so don't go touching that.
492 * This needs to be fixed in a big way.
494 lsm->lsm_object_id = oa->o_id;
495 lsm->lsm_object_seq = oa->o_seq;
499 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
501 if (oa->o_valid & OBD_MD_FLCOOKIE) {
502 if (!oti->oti_logcookies)
503 oti_alloc_cookies(oti, 1);
504 *oti->oti_logcookies = oa->o_lcookie;
508 CDEBUG(D_HA, "transno: "LPD64"\n",
509 lustre_msg_get_transno(req->rq_repmsg));
511 ptlrpc_req_finished(req);
514 obd_free_memmd(exp, &lsm);
518 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
519 obd_enqueue_update_f upcall, void *cookie,
520 struct ptlrpc_request_set *rqset)
522 struct ptlrpc_request *req;
523 struct osc_setattr_args *sa;
524 struct ost_body *body;
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
533 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535 ptlrpc_request_free(req);
538 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req);
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
544 osc_pack_capa(req, body, oinfo->oi_capa);
546 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
550 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
551 sa = ptlrpc_req_async_args(req);
552 sa->sa_oa = oinfo->oi_oa;
553 sa->sa_upcall = upcall;
554 sa->sa_cookie = cookie;
555 if (rqset == PTLRPCD_SET)
556 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
558 ptlrpc_set_add_req(rqset, req);
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564 struct obd_trans_info *oti,
565 struct ptlrpc_request_set *rqset)
567 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
568 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570 return osc_punch_base(exp, oinfo,
571 oinfo->oi_cb_up, oinfo, rqset);
574 static int osc_sync_interpret(const struct lu_env *env,
575 struct ptlrpc_request *req,
578 struct osc_async_args *aa = arg;
579 struct ost_body *body;
585 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
587 CERROR ("can't unpack ost_body\n");
588 GOTO(out, rc = -EPROTO);
591 *aa->aa_oi->oi_oa = body->oa;
593 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
597 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
598 obd_size start, obd_size end,
599 struct ptlrpc_request_set *set)
601 struct ptlrpc_request *req;
602 struct ost_body *body;
603 struct osc_async_args *aa;
608 CDEBUG(D_INFO, "oa NULL\n");
612 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
616 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
617 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
619 ptlrpc_request_free(req);
623 /* overload the size and blocks fields in the oa with start/end */
624 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
626 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
627 body->oa.o_size = start;
628 body->oa.o_blocks = end;
629 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
630 osc_pack_capa(req, body, oinfo->oi_capa);
632 ptlrpc_request_set_replen(req);
633 req->rq_interpret_reply = osc_sync_interpret;
635 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
636 aa = ptlrpc_req_async_args(req);
639 ptlrpc_set_add_req(set, req);
643 /* Find and cancel locally locks matched by @mode in the resource found by
644 * @objid. Found locks are added into @cancel list. Returns the amount of
645 * locks added to @cancels list. */
646 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
648 ldlm_mode_t mode, int lock_flags)
650 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
651 struct ldlm_res_id res_id;
652 struct ldlm_resource *res;
656 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
657 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
661 LDLM_RESOURCE_ADDREF(res);
662 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
663 lock_flags, 0, NULL);
664 LDLM_RESOURCE_DELREF(res);
665 ldlm_resource_putref(res);
669 static int osc_destroy_interpret(const struct lu_env *env,
670 struct ptlrpc_request *req, void *data,
673 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
675 cfs_atomic_dec(&cli->cl_destroy_in_flight);
676 cfs_waitq_signal(&cli->cl_destroy_waitq);
680 static int osc_can_send_destroy(struct client_obd *cli)
682 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
683 cli->cl_max_rpcs_in_flight) {
684 /* The destroy request can be sent */
687 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
688 cli->cl_max_rpcs_in_flight) {
690 * The counter has been modified between the two atomic
693 cfs_waitq_signal(&cli->cl_destroy_waitq);
698 /* Destroy requests can be async always on the client, and we don't even really
699 * care about the return code since the client cannot do anything at all about
701 * When the MDS is unlinking a filename, it saves the file objects into a
702 * recovery llog, and these object records are cancelled when the OST reports
703 * they were destroyed and sync'd to disk (i.e. transaction committed).
704 * If the client dies, or the OST is down when the object should be destroyed,
705 * the records are not cancelled, and when the OST reconnects to the MDS next,
706 * it will retrieve the llog unlink logs and then sends the log cancellation
707 * cookies to the MDS after committing destroy transactions. */
708 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
709 struct lov_stripe_md *ea, struct obd_trans_info *oti,
710 struct obd_export *md_export, void *capa)
712 struct client_obd *cli = &exp->exp_obd->u.cli;
713 struct ptlrpc_request *req;
714 struct ost_body *body;
715 CFS_LIST_HEAD(cancels);
720 CDEBUG(D_INFO, "oa NULL\n");
724 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
725 LDLM_FL_DISCARD_DATA);
727 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
729 ldlm_lock_list_put(&cancels, l_bl_ast, count);
733 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
734 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
737 ptlrpc_request_free(req);
741 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
742 ptlrpc_at_set_req_timeout(req);
744 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
745 oa->o_lcookie = *oti->oti_logcookies;
746 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
748 lustre_set_wire_obdo(&body->oa, oa);
750 osc_pack_capa(req, body, (struct obd_capa *)capa);
751 ptlrpc_request_set_replen(req);
753 /* don't throttle destroy RPCs for the MDT */
754 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
755 req->rq_interpret_reply = osc_destroy_interpret;
756 if (!osc_can_send_destroy(cli)) {
757 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
761 * Wait until the number of on-going destroy RPCs drops
762 * under max_rpc_in_flight
764 l_wait_event_exclusive(cli->cl_destroy_waitq,
765 osc_can_send_destroy(cli), &lwi);
769 /* Do not wait for response */
770 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
774 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
777 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
779 LASSERT(!(oa->o_valid & bits));
782 client_obd_list_lock(&cli->cl_loi_list_lock);
783 oa->o_dirty = cli->cl_dirty;
784 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
785 CERROR("dirty %lu - %lu > dirty_max %lu\n",
786 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
788 } else if (cfs_atomic_read(&obd_dirty_pages) -
789 cfs_atomic_read(&obd_dirty_transit_pages) >
790 obd_max_dirty_pages + 1){
791 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
792 * not covered by a lock thus they may safely race and trip
793 * this CERROR() unless we add in a small fudge factor (+1). */
794 CERROR("dirty %d - %d > system dirty_max %d\n",
795 cfs_atomic_read(&obd_dirty_pages),
796 cfs_atomic_read(&obd_dirty_transit_pages),
797 obd_max_dirty_pages);
799 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
800 CERROR("dirty %lu - dirty_max %lu too big???\n",
801 cli->cl_dirty, cli->cl_dirty_max);
804 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
805 (cli->cl_max_rpcs_in_flight + 1);
806 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
808 oa->o_grant = cli->cl_avail_grant;
809 oa->o_dropped = cli->cl_lost_grant;
810 cli->cl_lost_grant = 0;
811 client_obd_list_unlock(&cli->cl_loi_list_lock);
812 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
813 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
817 static void osc_update_next_shrink(struct client_obd *cli)
819 cli->cl_next_shrink_grant =
820 cfs_time_shift(cli->cl_grant_shrink_interval);
821 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
822 cli->cl_next_shrink_grant);
825 /* caller must hold loi_list_lock */
826 static void osc_consume_write_grant(struct client_obd *cli,
827 struct brw_page *pga)
829 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
830 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
831 cfs_atomic_inc(&obd_dirty_pages);
832 cli->cl_dirty += CFS_PAGE_SIZE;
833 cli->cl_avail_grant -= CFS_PAGE_SIZE;
834 pga->flag |= OBD_BRW_FROM_GRANT;
835 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
836 CFS_PAGE_SIZE, pga, pga->pg);
837 LASSERT(cli->cl_avail_grant >= 0);
838 osc_update_next_shrink(cli);
841 /* the companion to osc_consume_write_grant, called when a brw has completed.
842 * must be called with the loi lock held. */
843 static void osc_release_write_grant(struct client_obd *cli,
844 struct brw_page *pga, int sent)
846 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
849 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
850 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
855 pga->flag &= ~OBD_BRW_FROM_GRANT;
856 cfs_atomic_dec(&obd_dirty_pages);
857 cli->cl_dirty -= CFS_PAGE_SIZE;
858 if (pga->flag & OBD_BRW_NOCACHE) {
859 pga->flag &= ~OBD_BRW_NOCACHE;
860 cfs_atomic_dec(&obd_dirty_transit_pages);
861 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
864 cli->cl_lost_grant += CFS_PAGE_SIZE;
865 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
866 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
867 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
868 /* For short writes we shouldn't count parts of pages that
869 * span a whole block on the OST side, or our accounting goes
870 * wrong. Should match the code in filter_grant_check. */
871 int offset = pga->off & ~CFS_PAGE_MASK;
872 int count = pga->count + (offset & (blocksize - 1));
873 int end = (offset + pga->count) & (blocksize - 1);
875 count += blocksize - end;
877 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
878 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
879 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
880 cli->cl_avail_grant, cli->cl_dirty);
886 static unsigned long rpcs_in_flight(struct client_obd *cli)
888 return cli->cl_r_in_flight + cli->cl_w_in_flight;
891 /* caller must hold loi_list_lock */
892 void osc_wake_cache_waiters(struct client_obd *cli)
895 struct osc_cache_waiter *ocw;
898 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
899 /* if we can't dirty more, we must wait until some is written */
900 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
901 (cfs_atomic_read(&obd_dirty_pages) + 1 >
902 obd_max_dirty_pages)) {
903 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
904 "osc max %ld, sys max %d\n", cli->cl_dirty,
905 cli->cl_dirty_max, obd_max_dirty_pages);
909 /* if still dirty cache but no grant wait for pending RPCs that
910 * may yet return us some grant before doing sync writes */
911 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
912 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
913 cli->cl_w_in_flight);
917 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
918 cfs_list_del_init(&ocw->ocw_entry);
919 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
920 /* no more RPCs in flight to return grant, do sync IO */
921 ocw->ocw_rc = -EDQUOT;
922 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
924 osc_consume_write_grant(cli,
925 &ocw->ocw_oap->oap_brw_page);
928 cfs_waitq_signal(&ocw->ocw_waitq);
934 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
936 client_obd_list_lock(&cli->cl_loi_list_lock);
937 cli->cl_avail_grant += grant;
938 client_obd_list_unlock(&cli->cl_loi_list_lock);
941 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
943 if (body->oa.o_valid & OBD_MD_FLGRANT) {
944 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
945 __osc_update_grant(cli, body->oa.o_grant);
949 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
950 void *key, obd_count vallen, void *val,
951 struct ptlrpc_request_set *set);
953 static int osc_shrink_grant_interpret(const struct lu_env *env,
954 struct ptlrpc_request *req,
957 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
958 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
959 struct ost_body *body;
962 __osc_update_grant(cli, oa->o_grant);
966 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
968 osc_update_grant(cli, body);
974 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
976 client_obd_list_lock(&cli->cl_loi_list_lock);
977 oa->o_grant = cli->cl_avail_grant / 4;
978 cli->cl_avail_grant -= oa->o_grant;
979 client_obd_list_unlock(&cli->cl_loi_list_lock);
980 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
981 oa->o_valid |= OBD_MD_FLFLAGS;
984 oa->o_flags |= OBD_FL_SHRINK_GRANT;
985 osc_update_next_shrink(cli);
988 /* Shrink the current grant, either from some large amount to enough for a
989 * full set of in-flight RPCs, or if we have already shrunk to that limit
990 * then to enough for a single RPC. This avoids keeping more grant than
991 * needed, and avoids shrinking the grant piecemeal. */
992 static int osc_shrink_grant(struct client_obd *cli)
994 long target = (cli->cl_max_rpcs_in_flight + 1) *
995 cli->cl_max_pages_per_rpc;
997 client_obd_list_lock(&cli->cl_loi_list_lock);
998 if (cli->cl_avail_grant <= target)
999 target = cli->cl_max_pages_per_rpc;
1000 client_obd_list_unlock(&cli->cl_loi_list_lock);
1002 return osc_shrink_grant_to_target(cli, target);
1005 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1008 struct ost_body *body;
1011 client_obd_list_lock(&cli->cl_loi_list_lock);
1012 /* Don't shrink if we are already above or below the desired limit
1013 * We don't want to shrink below a single RPC, as that will negatively
1014 * impact block allocation and long-term performance. */
1015 if (target < cli->cl_max_pages_per_rpc)
1016 target = cli->cl_max_pages_per_rpc;
1018 if (target >= cli->cl_avail_grant) {
1019 client_obd_list_unlock(&cli->cl_loi_list_lock);
1022 client_obd_list_unlock(&cli->cl_loi_list_lock);
1024 OBD_ALLOC_PTR(body);
1028 osc_announce_cached(cli, &body->oa, 0);
1030 client_obd_list_lock(&cli->cl_loi_list_lock);
1031 body->oa.o_grant = cli->cl_avail_grant - target;
1032 cli->cl_avail_grant = target;
1033 client_obd_list_unlock(&cli->cl_loi_list_lock);
1034 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1035 body->oa.o_valid |= OBD_MD_FLFLAGS;
1036 body->oa.o_flags = 0;
1038 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1039 osc_update_next_shrink(cli);
1041 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1042 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1043 sizeof(*body), body, NULL);
1045 __osc_update_grant(cli, body->oa.o_grant);
1050 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1051 static int osc_should_shrink_grant(struct client_obd *client)
1053 cfs_time_t time = cfs_time_current();
1054 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1056 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1057 OBD_CONNECT_GRANT_SHRINK) == 0)
1060 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1061 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1062 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1065 osc_update_next_shrink(client);
1070 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1072 struct client_obd *client;
1074 cfs_list_for_each_entry(client, &item->ti_obd_list,
1075 cl_grant_shrink_list) {
1076 if (osc_should_shrink_grant(client))
1077 osc_shrink_grant(client);
1082 static int osc_add_shrink_grant(struct client_obd *client)
1086 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1088 osc_grant_shrink_grant_cb, NULL,
1089 &client->cl_grant_shrink_list);
1091 CERROR("add grant client %s error %d\n",
1092 client->cl_import->imp_obd->obd_name, rc);
1095 CDEBUG(D_CACHE, "add grant client %s \n",
1096 client->cl_import->imp_obd->obd_name);
1097 osc_update_next_shrink(client);
1101 static int osc_del_shrink_grant(struct client_obd *client)
1103 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1107 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1110 * ocd_grant is the total grant amount we're expect to hold: if we've
1111 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1112 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1114 * race is tolerable here: if we're evicted, but imp_state already
1115 * left EVICTED state, then cl_dirty must be 0 already.
1117 client_obd_list_lock(&cli->cl_loi_list_lock);
1118 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1119 cli->cl_avail_grant = ocd->ocd_grant;
1121 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1123 if (cli->cl_avail_grant < 0) {
1124 CWARN("%s: available grant < 0, the OSS is probably not running"
1125 " with patch from bug20278 (%ld) \n",
1126 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1127 /* workaround for 1.6 servers which do not have
1128 * the patch from bug20278 */
1129 cli->cl_avail_grant = ocd->ocd_grant;
1132 client_obd_list_unlock(&cli->cl_loi_list_lock);
1134 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1135 cli->cl_import->imp_obd->obd_name,
1136 cli->cl_avail_grant, cli->cl_lost_grant);
1138 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1139 cfs_list_empty(&cli->cl_grant_shrink_list))
1140 osc_add_shrink_grant(cli);
1143 /* We assume that the reason this OSC got a short read is because it read
1144 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1145 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1146 * this stripe never got written at or beyond this stripe offset yet. */
1147 static void handle_short_read(int nob_read, obd_count page_count,
1148 struct brw_page **pga)
1153 /* skip bytes read OK */
1154 while (nob_read > 0) {
1155 LASSERT (page_count > 0);
1157 if (pga[i]->count > nob_read) {
1158 /* EOF inside this page */
1159 ptr = cfs_kmap(pga[i]->pg) +
1160 (pga[i]->off & ~CFS_PAGE_MASK);
1161 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1162 cfs_kunmap(pga[i]->pg);
1168 nob_read -= pga[i]->count;
1173 /* zero remaining pages */
1174 while (page_count-- > 0) {
1175 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1176 memset(ptr, 0, pga[i]->count);
1177 cfs_kunmap(pga[i]->pg);
1182 static int check_write_rcs(struct ptlrpc_request *req,
1183 int requested_nob, int niocount,
1184 obd_count page_count, struct brw_page **pga)
1189 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1190 sizeof(*remote_rcs) *
1192 if (remote_rcs == NULL) {
1193 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1197 /* return error if any niobuf was in error */
1198 for (i = 0; i < niocount; i++) {
1199 if ((int)remote_rcs[i] < 0)
1200 return(remote_rcs[i]);
1202 if (remote_rcs[i] != 0) {
1203 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1204 i, remote_rcs[i], req);
1209 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1210 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1211 req->rq_bulk->bd_nob_transferred, requested_nob);
1218 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1220 if (p1->flag != p2->flag) {
1221 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1222 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1224 /* warn if we try to combine flags that we don't know to be
1225 * safe to combine */
1226 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1227 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1228 "report this at http://bugs.whamcloud.com/\n",
1229 p1->flag, p2->flag);
1234 return (p1->off + p1->count == p2->off);
1237 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1238 struct brw_page **pga, int opc,
1239 cksum_type_t cksum_type)
1244 LASSERT (pg_count > 0);
1245 cksum = init_checksum(cksum_type);
1246 while (nob > 0 && pg_count > 0) {
1247 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1248 int off = pga[i]->off & ~CFS_PAGE_MASK;
1249 int count = pga[i]->count > nob ? nob : pga[i]->count;
1251 /* corrupt the data before we compute the checksum, to
1252 * simulate an OST->client data error */
1253 if (i == 0 && opc == OST_READ &&
1254 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1255 memcpy(ptr + off, "bad1", min(4, nob));
1256 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1257 cfs_kunmap(pga[i]->pg);
1258 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1261 nob -= pga[i]->count;
1265 /* For sending we only compute the wrong checksum instead
1266 * of corrupting the data so it is still correct on a redo */
1267 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1270 return fini_checksum(cksum, cksum_type);
1273 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1274 struct lov_stripe_md *lsm, obd_count page_count,
1275 struct brw_page **pga,
1276 struct ptlrpc_request **reqp,
1277 struct obd_capa *ocapa, int reserve,
1280 struct ptlrpc_request *req;
1281 struct ptlrpc_bulk_desc *desc;
1282 struct ost_body *body;
1283 struct obd_ioobj *ioobj;
1284 struct niobuf_remote *niobuf;
1285 int niocount, i, requested_nob, opc, rc;
1286 struct osc_brw_async_args *aa;
1287 struct req_capsule *pill;
1288 struct brw_page *pg_prev;
1291 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1292 RETURN(-ENOMEM); /* Recoverable */
1293 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1294 RETURN(-EINVAL); /* Fatal */
1296 if ((cmd & OBD_BRW_WRITE) != 0) {
1298 req = ptlrpc_request_alloc_pool(cli->cl_import,
1299 cli->cl_import->imp_rq_pool,
1300 &RQF_OST_BRW_WRITE);
1303 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1308 for (niocount = i = 1; i < page_count; i++) {
1309 if (!can_merge_pages(pga[i - 1], pga[i]))
1313 pill = &req->rq_pill;
1314 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1316 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1317 niocount * sizeof(*niobuf));
1318 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1322 ptlrpc_request_free(req);
1325 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1326 ptlrpc_at_set_req_timeout(req);
1328 if (opc == OST_WRITE)
1329 desc = ptlrpc_prep_bulk_imp(req, page_count,
1330 BULK_GET_SOURCE, OST_BULK_PORTAL);
1332 desc = ptlrpc_prep_bulk_imp(req, page_count,
1333 BULK_PUT_SINK, OST_BULK_PORTAL);
1336 GOTO(out, rc = -ENOMEM);
1337 /* NB request now owns desc and will free it when it gets freed */
1339 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1340 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1341 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1342 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1344 lustre_set_wire_obdo(&body->oa, oa);
1346 obdo_to_ioobj(oa, ioobj);
1347 ioobj->ioo_bufcnt = niocount;
1348 osc_pack_capa(req, body, ocapa);
1349 LASSERT (page_count > 0);
1351 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1352 struct brw_page *pg = pga[i];
1353 int poff = pg->off & ~CFS_PAGE_MASK;
1355 LASSERT(pg->count > 0);
1356 /* make sure there is no gap in the middle of page array */
1357 LASSERTF(page_count == 1 ||
1358 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1359 ergo(i > 0 && i < page_count - 1,
1360 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1361 ergo(i == page_count - 1, poff == 0)),
1362 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1363 i, page_count, pg, pg->off, pg->count);
1365 LASSERTF(i == 0 || pg->off > pg_prev->off,
1366 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1367 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1369 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1370 pg_prev->pg, page_private(pg_prev->pg),
1371 pg_prev->pg->index, pg_prev->off);
1373 LASSERTF(i == 0 || pg->off > pg_prev->off,
1374 "i %d p_c %u\n", i, page_count);
1376 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1377 (pg->flag & OBD_BRW_SRVLOCK));
1379 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1380 requested_nob += pg->count;
1382 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1384 niobuf->len += pg->count;
1386 niobuf->offset = pg->off;
1387 niobuf->len = pg->count;
1388 niobuf->flags = pg->flag;
1393 LASSERTF((void *)(niobuf - niocount) ==
1394 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1395 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1396 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1398 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1400 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401 body->oa.o_valid |= OBD_MD_FLFLAGS;
1402 body->oa.o_flags = 0;
1404 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1407 if (osc_should_shrink_grant(cli))
1408 osc_shrink_grant_local(cli, &body->oa);
1410 /* size[REQ_REC_OFF] still sizeof (*body) */
1411 if (opc == OST_WRITE) {
1412 if (unlikely(cli->cl_checksum) &&
1413 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1414 /* store cl_cksum_type in a local variable since
1415 * it can be changed via lprocfs */
1416 cksum_type_t cksum_type = cli->cl_cksum_type;
1418 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1419 oa->o_flags &= OBD_FL_LOCAL_MASK;
1420 body->oa.o_flags = 0;
1422 body->oa.o_flags |= cksum_type_pack(cksum_type);
1423 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1428 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1430 /* save this in 'oa', too, for later checking */
1431 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1432 oa->o_flags |= cksum_type_pack(cksum_type);
1434 /* clear out the checksum flag, in case this is a
1435 * resend but cl_checksum is no longer set. b=11238 */
1436 oa->o_valid &= ~OBD_MD_FLCKSUM;
1438 oa->o_cksum = body->oa.o_cksum;
1439 /* 1 RC per niobuf */
1440 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1441 sizeof(__u32) * niocount);
1443 if (unlikely(cli->cl_checksum) &&
1444 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1445 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1446 body->oa.o_flags = 0;
1447 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1448 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1451 ptlrpc_request_set_replen(req);
1453 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1454 aa = ptlrpc_req_async_args(req);
1456 aa->aa_requested_nob = requested_nob;
1457 aa->aa_nio_count = niocount;
1458 aa->aa_page_count = page_count;
1462 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1463 if (ocapa && reserve)
1464 aa->aa_ocapa = capa_get(ocapa);
1470 ptlrpc_req_finished(req);
1474 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1475 __u32 client_cksum, __u32 server_cksum, int nob,
1476 obd_count page_count, struct brw_page **pga,
1477 cksum_type_t client_cksum_type)
1481 cksum_type_t cksum_type;
1483 if (server_cksum == client_cksum) {
1484 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1488 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1490 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1493 if (cksum_type != client_cksum_type)
1494 msg = "the server did not use the checksum type specified in "
1495 "the original request - likely a protocol problem";
1496 else if (new_cksum == server_cksum)
1497 msg = "changed on the client after we checksummed it - "
1498 "likely false positive due to mmap IO (bug 11742)";
1499 else if (new_cksum == client_cksum)
1500 msg = "changed in transit before arrival at OST";
1502 msg = "changed in transit AND doesn't match the original - "
1503 "likely false positive due to mmap IO (bug 11742)";
1505 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1506 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1507 msg, libcfs_nid2str(peer->nid),
1508 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1509 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1510 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1512 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1514 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1515 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1516 "client csum now %x\n", client_cksum, client_cksum_type,
1517 server_cksum, cksum_type, new_cksum);
1521 /* Note rc enters this function as number of bytes transferred */
1522 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1524 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1525 const lnet_process_id_t *peer =
1526 &req->rq_import->imp_connection->c_peer;
1527 struct client_obd *cli = aa->aa_cli;
1528 struct ost_body *body;
1529 __u32 client_cksum = 0;
1532 if (rc < 0 && rc != -EDQUOT) {
1533 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1537 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1538 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1540 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1544 /* set/clear over quota flag for a uid/gid */
1545 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1546 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1547 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1549 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1550 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1552 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1555 osc_update_grant(cli, body);
1560 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1561 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1563 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1565 CERROR("Unexpected +ve rc %d\n", rc);
1568 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1570 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1573 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1574 check_write_checksum(&body->oa, peer, client_cksum,
1575 body->oa.o_cksum, aa->aa_requested_nob,
1576 aa->aa_page_count, aa->aa_ppga,
1577 cksum_type_unpack(aa->aa_oa->o_flags)))
1580 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1581 aa->aa_page_count, aa->aa_ppga);
1585 /* The rest of this function executes only for OST_READs */
1587 /* if unwrap_bulk failed, return -EAGAIN to retry */
1588 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1590 GOTO(out, rc = -EAGAIN);
1592 if (rc > aa->aa_requested_nob) {
1593 CERROR("Unexpected rc %d (%d requested)\n", rc,
1594 aa->aa_requested_nob);
1598 if (rc != req->rq_bulk->bd_nob_transferred) {
1599 CERROR ("Unexpected rc %d (%d transferred)\n",
1600 rc, req->rq_bulk->bd_nob_transferred);
1604 if (rc < aa->aa_requested_nob)
1605 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1607 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1608 static int cksum_counter;
1609 __u32 server_cksum = body->oa.o_cksum;
1612 cksum_type_t cksum_type;
1614 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1615 body->oa.o_flags : 0);
1616 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1617 aa->aa_ppga, OST_READ,
1620 if (peer->nid == req->rq_bulk->bd_sender) {
1624 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1627 if (server_cksum == ~0 && rc > 0) {
1628 CERROR("Protocol error: server %s set the 'checksum' "
1629 "bit, but didn't send a checksum. Not fatal, "
1630 "but please notify on http://bugs.whamcloud.com/\n",
1631 libcfs_nid2str(peer->nid));
1632 } else if (server_cksum != client_cksum) {
1633 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1634 "%s%s%s inode "DFID" object "
1635 LPU64"/"LPU64" extent "
1636 "["LPU64"-"LPU64"]\n",
1637 req->rq_import->imp_obd->obd_name,
1638 libcfs_nid2str(peer->nid),
1640 body->oa.o_valid & OBD_MD_FLFID ?
1641 body->oa.o_parent_seq : (__u64)0,
1642 body->oa.o_valid & OBD_MD_FLFID ?
1643 body->oa.o_parent_oid : 0,
1644 body->oa.o_valid & OBD_MD_FLFID ?
1645 body->oa.o_parent_ver : 0,
1647 body->oa.o_valid & OBD_MD_FLGROUP ?
1648 body->oa.o_seq : (__u64)0,
1649 aa->aa_ppga[0]->off,
1650 aa->aa_ppga[aa->aa_page_count-1]->off +
1651 aa->aa_ppga[aa->aa_page_count-1]->count -
1653 CERROR("client %x, server %x, cksum_type %x\n",
1654 client_cksum, server_cksum, cksum_type);
1656 aa->aa_oa->o_cksum = client_cksum;
1660 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1663 } else if (unlikely(client_cksum)) {
1664 static int cksum_missed;
1667 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1668 CERROR("Checksum %u requested from %s but not sent\n",
1669 cksum_missed, libcfs_nid2str(peer->nid));
1675 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1680 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1681 struct lov_stripe_md *lsm,
1682 obd_count page_count, struct brw_page **pga,
1683 struct obd_capa *ocapa)
1685 struct ptlrpc_request *req;
1689 struct l_wait_info lwi;
1693 cfs_waitq_init(&waitq);
1696 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1697 page_count, pga, &req, ocapa, 0, resends);
1701 rc = ptlrpc_queue_wait(req);
1703 if (rc == -ETIMEDOUT && req->rq_resend) {
1704 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1705 ptlrpc_req_finished(req);
1709 rc = osc_brw_fini_request(req, rc);
1711 ptlrpc_req_finished(req);
1712 if (osc_recoverable_error(rc)) {
1714 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1715 CERROR("too many resend retries, returning error\n");
1719 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1720 l_wait_event(waitq, 0, &lwi);
1728 int osc_brw_redo_request(struct ptlrpc_request *request,
1729 struct osc_brw_async_args *aa)
1731 struct ptlrpc_request *new_req;
1732 struct ptlrpc_request_set *set = request->rq_set;
1733 struct osc_brw_async_args *new_aa;
1734 struct osc_async_page *oap;
1738 if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1739 CERROR("too many resent retries, returning error\n");
1743 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1745 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1746 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1747 aa->aa_cli, aa->aa_oa,
1748 NULL /* lsm unused by osc currently */,
1749 aa->aa_page_count, aa->aa_ppga,
1750 &new_req, aa->aa_ocapa, 0, 1);
1754 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1756 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1757 if (oap->oap_request != NULL) {
1758 LASSERTF(request == oap->oap_request,
1759 "request %p != oap_request %p\n",
1760 request, oap->oap_request);
1761 if (oap->oap_interrupted) {
1762 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1763 ptlrpc_req_finished(new_req);
1768 /* New request takes over pga and oaps from old request.
1769 * Note that copying a list_head doesn't work, need to move it... */
1771 new_req->rq_interpret_reply = request->rq_interpret_reply;
1772 new_req->rq_async_args = request->rq_async_args;
1773 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1775 new_aa = ptlrpc_req_async_args(new_req);
1777 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1778 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1779 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1781 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1782 if (oap->oap_request) {
1783 ptlrpc_req_finished(oap->oap_request);
1784 oap->oap_request = ptlrpc_request_addref(new_req);
1788 new_aa->aa_ocapa = aa->aa_ocapa;
1789 aa->aa_ocapa = NULL;
1791 /* use ptlrpc_set_add_req is safe because interpret functions work
1792 * in check_set context. only one way exist with access to request
1793 * from different thread got -EINTR - this way protected with
1794 * cl_loi_list_lock */
1795 ptlrpc_set_add_req(set, new_req);
1797 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1799 DEBUG_REQ(D_INFO, new_req, "new request");
1804 * ugh, we want disk allocation on the target to happen in offset order. we'll
1805 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1806 * fine for our small page arrays and doesn't require allocation. its an
1807 * insertion sort that swaps elements that are strides apart, shrinking the
1808 * stride down until its '1' and the array is sorted.
1810 static void sort_brw_pages(struct brw_page **array, int num)
1813 struct brw_page *tmp;
1817 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1822 for (i = stride ; i < num ; i++) {
1825 while (j >= stride && array[j - stride]->off > tmp->off) {
1826 array[j] = array[j - stride];
1831 } while (stride > 1);
1834 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1840 LASSERT (pages > 0);
1841 offset = pg[i]->off & ~CFS_PAGE_MASK;
1845 if (pages == 0) /* that's all */
1848 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1849 return count; /* doesn't end on page boundary */
1852 offset = pg[i]->off & ~CFS_PAGE_MASK;
1853 if (offset != 0) /* doesn't start on page boundary */
1860 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1862 struct brw_page **ppga;
1865 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1869 for (i = 0; i < count; i++)
1874 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1876 LASSERT(ppga != NULL);
1877 OBD_FREE(ppga, sizeof(*ppga) * count);
1880 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1881 obd_count page_count, struct brw_page *pga,
1882 struct obd_trans_info *oti)
1884 struct obdo *saved_oa = NULL;
1885 struct brw_page **ppga, **orig;
1886 struct obd_import *imp = class_exp2cliimp(exp);
1887 struct client_obd *cli;
1888 int rc, page_count_orig;
1891 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1892 cli = &imp->imp_obd->u.cli;
1894 if (cmd & OBD_BRW_CHECK) {
1895 /* The caller just wants to know if there's a chance that this
1896 * I/O can succeed */
1898 if (imp->imp_invalid)
1903 /* test_brw with a failed create can trip this, maybe others. */
1904 LASSERT(cli->cl_max_pages_per_rpc);
1908 orig = ppga = osc_build_ppga(pga, page_count);
1911 page_count_orig = page_count;
1913 sort_brw_pages(ppga, page_count);
1914 while (page_count) {
1915 obd_count pages_per_brw;
1917 if (page_count > cli->cl_max_pages_per_rpc)
1918 pages_per_brw = cli->cl_max_pages_per_rpc;
1920 pages_per_brw = page_count;
1922 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1924 if (saved_oa != NULL) {
1925 /* restore previously saved oa */
1926 *oinfo->oi_oa = *saved_oa;
1927 } else if (page_count > pages_per_brw) {
1928 /* save a copy of oa (brw will clobber it) */
1929 OBDO_ALLOC(saved_oa);
1930 if (saved_oa == NULL)
1931 GOTO(out, rc = -ENOMEM);
1932 *saved_oa = *oinfo->oi_oa;
1935 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1936 pages_per_brw, ppga, oinfo->oi_capa);
1941 page_count -= pages_per_brw;
1942 ppga += pages_per_brw;
1946 osc_release_ppga(orig, page_count_orig);
1948 if (saved_oa != NULL)
1949 OBDO_FREE(saved_oa);
1954 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1955 * the dirty accounting. Writeback completes or truncate happens before
1956 * writing starts. Must be called with the loi lock held. */
1957 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1960 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1964 /* This maintains the lists of pending pages to read/write for a given object
1965 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1966 * to quickly find objects that are ready to send an RPC. */
1967 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1972 if (lop->lop_num_pending == 0)
1975 /* if we have an invalid import we want to drain the queued pages
1976 * by forcing them through rpcs that immediately fail and complete
1977 * the pages. recovery relies on this to empty the queued pages
1978 * before canceling the locks and evicting down the llite pages */
1979 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1982 /* stream rpcs in queue order as long as as there is an urgent page
1983 * queued. this is our cheap solution for good batching in the case
1984 * where writepage marks some random page in the middle of the file
1985 * as urgent because of, say, memory pressure */
1986 if (!cfs_list_empty(&lop->lop_urgent)) {
1987 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1991 if (cmd & OBD_BRW_WRITE) {
1992 /* trigger a write rpc stream as long as there are dirtiers
1993 * waiting for space. as they're waiting, they're not going to
1994 * create more pages to coalesce with what's waiting.. */
1995 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2000 if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2006 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2008 struct osc_async_page *oap;
2011 if (cfs_list_empty(&lop->lop_urgent))
2014 oap = cfs_list_entry(lop->lop_urgent.next,
2015 struct osc_async_page, oap_urgent_item);
2017 if (oap->oap_async_flags & ASYNC_HP) {
2018 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2025 static void on_list(cfs_list_t *item, cfs_list_t *list,
2028 if (cfs_list_empty(item) && should_be_on)
2029 cfs_list_add_tail(item, list);
2030 else if (!cfs_list_empty(item) && !should_be_on)
2031 cfs_list_del_init(item);
2034 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2035 * can find pages to build into rpcs quickly */
2036 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2038 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2039 lop_makes_hprpc(&loi->loi_read_lop)) {
2041 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2042 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2044 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2045 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2046 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2047 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2050 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2051 loi->loi_write_lop.lop_num_pending);
2053 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2054 loi->loi_read_lop.lop_num_pending);
2057 static void lop_update_pending(struct client_obd *cli,
2058 struct loi_oap_pages *lop, int cmd, int delta)
2060 lop->lop_num_pending += delta;
2061 if (cmd & OBD_BRW_WRITE)
2062 cli->cl_pending_w_pages += delta;
2064 cli->cl_pending_r_pages += delta;
2068 * this is called when a sync waiter receives an interruption. Its job is to
2069 * get the caller woken as soon as possible. If its page hasn't been put in an
2070 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2071 * desiring interruption which will forcefully complete the rpc once the rpc
2074 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2076 struct loi_oap_pages *lop;
2077 struct lov_oinfo *loi;
2081 LASSERT(!oap->oap_interrupted);
2082 oap->oap_interrupted = 1;
2084 /* ok, it's been put in an rpc. only one oap gets a request reference */
2085 if (oap->oap_request != NULL) {
2086 ptlrpc_mark_interrupted(oap->oap_request);
2087 ptlrpcd_wake(oap->oap_request);
2088 ptlrpc_req_finished(oap->oap_request);
2089 oap->oap_request = NULL;
2093 * page completion may be called only if ->cpo_prep() method was
2094 * executed by osc_io_submit(), that also adds page the to pending list
2096 if (!cfs_list_empty(&oap->oap_pending_item)) {
2097 cfs_list_del_init(&oap->oap_pending_item);
2098 cfs_list_del_init(&oap->oap_urgent_item);
2101 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2102 &loi->loi_write_lop : &loi->loi_read_lop;
2103 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2104 loi_list_maint(oap->oap_cli, oap->oap_loi);
2105 rc = oap->oap_caller_ops->ap_completion(env,
2106 oap->oap_caller_data,
2107 oap->oap_cmd, NULL, -EINTR);
2113 /* this is trying to propogate async writeback errors back up to the
2114 * application. As an async write fails we record the error code for later if
2115 * the app does an fsync. As long as errors persist we force future rpcs to be
2116 * sync so that the app can get a sync error and break the cycle of queueing
2117 * pages for which writeback will fail. */
2118 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2125 ar->ar_force_sync = 1;
2126 ar->ar_min_xid = ptlrpc_sample_next_xid();
2131 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2132 ar->ar_force_sync = 0;
2135 void osc_oap_to_pending(struct osc_async_page *oap)
2137 struct loi_oap_pages *lop;
2139 if (oap->oap_cmd & OBD_BRW_WRITE)
2140 lop = &oap->oap_loi->loi_write_lop;
2142 lop = &oap->oap_loi->loi_read_lop;
2144 if (oap->oap_async_flags & ASYNC_HP)
2145 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2146 else if (oap->oap_async_flags & ASYNC_URGENT)
2147 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2148 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2149 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2152 /* this must be called holding the loi list lock to give coverage to exit_cache,
2153 * async_flag maintenance, and oap_request */
2154 static void osc_ap_completion(const struct lu_env *env,
2155 struct client_obd *cli, struct obdo *oa,
2156 struct osc_async_page *oap, int sent, int rc)
2161 if (oap->oap_request != NULL) {
2162 xid = ptlrpc_req_xid(oap->oap_request);
2163 ptlrpc_req_finished(oap->oap_request);
2164 oap->oap_request = NULL;
2167 cfs_spin_lock(&oap->oap_lock);
2168 oap->oap_async_flags = 0;
2169 cfs_spin_unlock(&oap->oap_lock);
2170 oap->oap_interrupted = 0;
2172 if (oap->oap_cmd & OBD_BRW_WRITE) {
2173 osc_process_ar(&cli->cl_ar, xid, rc);
2174 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2177 if (rc == 0 && oa != NULL) {
2178 if (oa->o_valid & OBD_MD_FLBLOCKS)
2179 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2180 if (oa->o_valid & OBD_MD_FLMTIME)
2181 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2182 if (oa->o_valid & OBD_MD_FLATIME)
2183 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2184 if (oa->o_valid & OBD_MD_FLCTIME)
2185 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2188 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2189 oap->oap_cmd, oa, rc);
2191 /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2192 * start, but OSC calls it under lock and thus we can add oap back to
2195 /* upper layer wants to leave the page on pending queue */
2196 osc_oap_to_pending(oap);
2198 osc_exit_cache(cli, oap, sent);
2202 static int brw_queue_work(const struct lu_env *env, void *data)
2204 struct client_obd *cli = data;
2206 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2208 client_obd_list_lock(&cli->cl_loi_list_lock);
2209 osc_check_rpcs0(env, cli, 1);
2210 client_obd_list_unlock(&cli->cl_loi_list_lock);
2214 static int brw_interpret(const struct lu_env *env,
2215 struct ptlrpc_request *req, void *data, int rc)
2217 struct osc_brw_async_args *aa = data;
2218 struct client_obd *cli;
2222 rc = osc_brw_fini_request(req, rc);
2223 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2224 if (osc_recoverable_error(rc)) {
2225 rc = osc_brw_redo_request(req, aa);
2231 capa_put(aa->aa_ocapa);
2232 aa->aa_ocapa = NULL;
2236 client_obd_list_lock(&cli->cl_loi_list_lock);
2238 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2239 * is called so we know whether to go to sync BRWs or wait for more
2240 * RPCs to complete */
2241 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2242 cli->cl_w_in_flight--;
2244 cli->cl_r_in_flight--;
2246 async = cfs_list_empty(&aa->aa_oaps);
2247 if (!async) { /* from osc_send_oap_rpc() */
2248 struct osc_async_page *oap, *tmp;
2249 /* the caller may re-use the oap after the completion call so
2250 * we need to clean it up a little */
2251 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2253 cfs_list_del_init(&oap->oap_rpc_item);
2254 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2256 OBDO_FREE(aa->aa_oa);
2257 } else { /* from async_internal() */
2259 for (i = 0; i < aa->aa_page_count; i++)
2260 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2262 osc_wake_cache_waiters(cli);
2263 osc_check_rpcs0(env, cli, 1);
2264 client_obd_list_unlock(&cli->cl_loi_list_lock);
2267 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2268 req->rq_bulk->bd_nob_transferred);
2269 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2270 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2275 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2276 struct client_obd *cli,
2277 cfs_list_t *rpc_list,
2278 int page_count, int cmd)
2280 struct ptlrpc_request *req;
2281 struct brw_page **pga = NULL;
2282 struct osc_brw_async_args *aa;
2283 struct obdo *oa = NULL;
2284 const struct obd_async_page_ops *ops = NULL;
2285 struct osc_async_page *oap;
2286 struct osc_async_page *tmp;
2287 struct cl_req *clerq = NULL;
2288 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2289 struct ldlm_lock *lock = NULL;
2290 struct cl_req_attr crattr;
2291 int i, rc, mpflag = 0;
2294 LASSERT(!cfs_list_empty(rpc_list));
2296 if (cmd & OBD_BRW_MEMALLOC)
2297 mpflag = cfs_memory_pressure_get_and_set();
2299 memset(&crattr, 0, sizeof crattr);
2300 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2302 GOTO(out, req = ERR_PTR(-ENOMEM));
2306 GOTO(out, req = ERR_PTR(-ENOMEM));
2309 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2310 struct cl_page *page = osc_oap2cl_page(oap);
2312 ops = oap->oap_caller_ops;
2314 clerq = cl_req_alloc(env, page, crt,
2315 1 /* only 1-object rpcs for
2318 GOTO(out, req = (void *)clerq);
2319 lock = oap->oap_ldlm_lock;
2321 pga[i] = &oap->oap_brw_page;
2322 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2323 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2324 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2326 cl_req_page_add(env, clerq, page);
2329 /* always get the data for the obdo for the rpc */
2330 LASSERT(ops != NULL);
2332 crattr.cra_capa = NULL;
2333 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2335 oa->o_handle = lock->l_remote_handle;
2336 oa->o_valid |= OBD_MD_FLHANDLE;
2339 rc = cl_req_prep(env, clerq);
2341 CERROR("cl_req_prep failed: %d\n", rc);
2342 GOTO(out, req = ERR_PTR(rc));
2345 sort_brw_pages(pga, page_count);
2346 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2347 pga, &req, crattr.cra_capa, 1, 0);
2349 CERROR("prep_req failed: %d\n", rc);
2350 GOTO(out, req = ERR_PTR(rc));
2353 if (cmd & OBD_BRW_MEMALLOC)
2354 req->rq_memalloc = 1;
2356 /* Need to update the timestamps after the request is built in case
2357 * we race with setattr (locally or in queue at OST). If OST gets
2358 * later setattr before earlier BRW (as determined by the request xid),
2359 * the OST will not use BRW timestamps. Sadly, there is no obvious
2360 * way to do this in a single call. bug 10150 */
2361 cl_req_attr_set(env, clerq, &crattr,
2362 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2364 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2365 aa = ptlrpc_req_async_args(req);
2366 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2367 cfs_list_splice(rpc_list, &aa->aa_oaps);
2368 CFS_INIT_LIST_HEAD(rpc_list);
2369 aa->aa_clerq = clerq;
2371 if (cmd & OBD_BRW_MEMALLOC)
2372 cfs_memory_pressure_restore(mpflag);
2374 capa_put(crattr.cra_capa);
2379 OBD_FREE(pga, sizeof(*pga) * page_count);
2380 /* this should happen rarely and is pretty bad, it makes the
2381 * pending list not follow the dirty order */
2382 client_obd_list_lock(&cli->cl_loi_list_lock);
2383 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2384 cfs_list_del_init(&oap->oap_rpc_item);
2386 /* queued sync pages can be torn down while the pages
2387 * were between the pending list and the rpc */
2388 if (oap->oap_interrupted) {
2389 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2390 osc_ap_completion(env, cli, NULL, oap, 0,
2394 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2396 if (clerq && !IS_ERR(clerq))
2397 cl_req_completion(env, clerq, PTR_ERR(req));
2403 * prepare pages for ASYNC io and put pages in send queue.
2405 * \param cmd OBD_BRW_* macroses
2406 * \param lop pending pages
2408 * \return zero if no page added to send queue.
2409 * \return 1 if pages successfully added to send queue.
2410 * \return negative on errors.
2413 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2414 struct lov_oinfo *loi, int cmd,
2415 struct loi_oap_pages *lop, pdl_policy_t pol)
2417 struct ptlrpc_request *req;
2418 obd_count page_count = 0;
2419 struct osc_async_page *oap = NULL, *tmp;
2420 struct osc_brw_async_args *aa;
2421 const struct obd_async_page_ops *ops;
2422 CFS_LIST_HEAD(rpc_list);
2423 int srvlock = 0, mem_tight = 0;
2424 struct cl_object *clob = NULL;
2425 obd_off starting_offset = OBD_OBJECT_EOF;
2426 unsigned int ending_offset;
2427 int starting_page_off = 0;
2430 /* ASYNC_HP pages first. At present, when the lock the pages is
2431 * to be canceled, the pages covered by the lock will be sent out
2432 * with ASYNC_HP. We have to send out them as soon as possible. */
2433 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2434 if (oap->oap_async_flags & ASYNC_HP)
2435 cfs_list_move(&oap->oap_pending_item, &rpc_list);
2436 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2437 /* only do this for writeback pages. */
2438 cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2439 if (++page_count >= cli->cl_max_pages_per_rpc)
2442 cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2445 /* first we find the pages we're allowed to work with */
2446 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2448 ops = oap->oap_caller_ops;
2450 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2451 "magic 0x%x\n", oap, oap->oap_magic);
2454 /* pin object in memory, so that completion call-backs
2455 * can be safely called under client_obd_list lock. */
2456 clob = osc_oap2cl_page(oap)->cp_obj;
2457 cl_object_get(clob);
2460 if (page_count != 0 &&
2461 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2462 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2463 " oap %p, page %p, srvlock %u\n",
2464 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2468 /* If there is a gap at the start of this page, it can't merge
2469 * with any previous page, so we'll hand the network a
2470 * "fragmented" page array that it can't transfer in 1 RDMA */
2471 if (oap->oap_obj_off < starting_offset) {
2472 if (starting_page_off != 0)
2475 starting_page_off = oap->oap_page_off;
2476 starting_offset = oap->oap_obj_off + starting_page_off;
2477 } else if (oap->oap_page_off != 0)
2480 /* in llite being 'ready' equates to the page being locked
2481 * until completion unlocks it. commit_write submits a page
2482 * as not ready because its unlock will happen unconditionally
2483 * as the call returns. if we race with commit_write giving
2484 * us that page we don't want to create a hole in the page
2485 * stream, so we stop and leave the rpc to be fired by
2486 * another dirtier or kupdated interval (the not ready page
2487 * will still be on the dirty list). we could call in
2488 * at the end of ll_file_write to process the queue again. */
2489 if (!(oap->oap_async_flags & ASYNC_READY)) {
2490 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2493 CDEBUG(D_INODE, "oap %p page %p returned %d "
2494 "instead of ready\n", oap,
2498 /* llite is telling us that the page is still
2499 * in commit_write and that we should try
2500 * and put it in an rpc again later. we
2501 * break out of the loop so we don't create
2502 * a hole in the sequence of pages in the rpc
2507 /* the io isn't needed.. tell the checks
2508 * below to complete the rpc with EINTR */
2509 cfs_spin_lock(&oap->oap_lock);
2510 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2511 cfs_spin_unlock(&oap->oap_lock);
2512 oap->oap_count = -EINTR;
2515 cfs_spin_lock(&oap->oap_lock);
2516 oap->oap_async_flags |= ASYNC_READY;
2517 cfs_spin_unlock(&oap->oap_lock);
2520 LASSERTF(0, "oap %p page %p returned %d "
2521 "from make_ready\n", oap,
2529 /* take the page out of our book-keeping */
2530 cfs_list_del_init(&oap->oap_pending_item);
2531 lop_update_pending(cli, lop, cmd, -1);
2532 cfs_list_del_init(&oap->oap_urgent_item);
2534 /* ask the caller for the size of the io as the rpc leaves. */
2535 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2537 ops->ap_refresh_count(env, oap->oap_caller_data,
2539 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2541 if (oap->oap_count <= 0) {
2542 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2544 osc_ap_completion(env, cli, NULL,
2545 oap, 0, oap->oap_count);
2549 /* now put the page back in our accounting */
2550 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2551 if (page_count++ == 0)
2552 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2554 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2557 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2558 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2559 * have the same alignment as the initial writes that allocated
2560 * extents on the server. */
2561 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2563 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2566 if (page_count >= cli->cl_max_pages_per_rpc)
2569 /* If there is a gap at the end of this page, it can't merge
2570 * with any subsequent pages, so we'll hand the network a
2571 * "fragmented" page array that it can't transfer in 1 RDMA */
2572 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2576 osc_wake_cache_waiters(cli);
2578 loi_list_maint(cli, loi);
2580 client_obd_list_unlock(&cli->cl_loi_list_lock);
2583 cl_object_put(env, clob);
2585 if (page_count == 0) {
2586 client_obd_list_lock(&cli->cl_loi_list_lock);
2590 req = osc_build_req(env, cli, &rpc_list, page_count,
2591 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2593 LASSERT(cfs_list_empty(&rpc_list));
2594 loi_list_maint(cli, loi);
2595 RETURN(PTR_ERR(req));
2598 aa = ptlrpc_req_async_args(req);
2600 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2601 if (cmd == OBD_BRW_READ) {
2602 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2603 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2604 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2605 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2607 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2608 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2609 cli->cl_w_in_flight);
2610 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2611 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2614 client_obd_list_lock(&cli->cl_loi_list_lock);
2616 if (cmd == OBD_BRW_READ)
2617 cli->cl_r_in_flight++;
2619 cli->cl_w_in_flight++;
2621 /* queued sync pages can be torn down while the pages
2622 * were between the pending list and the rpc */
2624 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2625 /* only one oap gets a request reference */
2628 if (oap->oap_interrupted && !req->rq_intr) {
2629 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2631 ptlrpc_mark_interrupted(req);
2635 tmp->oap_request = ptlrpc_request_addref(req);
2637 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2638 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2640 req->rq_interpret_reply = brw_interpret;
2642 /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2643 * CPU/NUMA node the majority of pages were allocated on, and try
2644 * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2645 * to reduce cross-CPU memory traffic.
2647 * But on the other hand, we expect that multiple ptlrpcd threads
2648 * and the initial write sponsor can run in parallel, especially
2649 * when data checksum is enabled, which is CPU-bound operation and
2650 * single ptlrpcd thread cannot process in time. So more ptlrpcd
2651 * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2653 ptlrpcd_add_req(req, pol, -1);
2657 #define LOI_DEBUG(LOI, STR, args...) \
2658 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2659 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2660 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2661 (LOI)->loi_write_lop.lop_num_pending, \
2662 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2663 (LOI)->loi_read_lop.lop_num_pending, \
2664 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2667 /* This is called by osc_check_rpcs() to find which objects have pages that
2668 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2669 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2673 /* First return objects that have blocked locks so that they
2674 * will be flushed quickly and other clients can get the lock,
2675 * then objects which have pages ready to be stuffed into RPCs */
2676 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2677 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2678 struct lov_oinfo, loi_hp_ready_item));
2679 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2680 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2681 struct lov_oinfo, loi_ready_item));
2683 /* then if we have cache waiters, return all objects with queued
2684 * writes. This is especially important when many small files
2685 * have filled up the cache and not been fired into rpcs because
2686 * they don't pass the nr_pending/object threshhold */
2687 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2688 !cfs_list_empty(&cli->cl_loi_write_list))
2689 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2690 struct lov_oinfo, loi_write_item));
2692 /* then return all queued objects when we have an invalid import
2693 * so that they get flushed */
2694 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2695 if (!cfs_list_empty(&cli->cl_loi_write_list))
2696 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2699 if (!cfs_list_empty(&cli->cl_loi_read_list))
2700 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2701 struct lov_oinfo, loi_read_item));
2706 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2708 struct osc_async_page *oap;
2711 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2712 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2713 struct osc_async_page, oap_urgent_item);
2714 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2717 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2718 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2719 struct osc_async_page, oap_urgent_item);
2720 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2723 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2726 /* called with the loi list lock held */
2727 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2729 struct lov_oinfo *loi;
2730 int rc = 0, race_counter = 0;
2734 pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2736 while ((loi = osc_next_loi(cli)) != NULL) {
2737 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2739 if (osc_max_rpc_in_flight(cli, loi))
2742 /* attempt some read/write balancing by alternating between
2743 * reads and writes in an object. The makes_rpc checks here
2744 * would be redundant if we were getting read/write work items
2745 * instead of objects. we don't want send_oap_rpc to drain a
2746 * partial read pending queue when we're given this object to
2747 * do io on writes while there are cache waiters */
2748 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2749 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2750 &loi->loi_write_lop, pol);
2752 CERROR("Write request failed with %d\n", rc);
2754 /* osc_send_oap_rpc failed, mostly because of
2757 * It can't break here, because if:
2758 * - a page was submitted by osc_io_submit, so
2760 * - no request in flight
2761 * - no subsequent request
2762 * The system will be in live-lock state,
2763 * because there is no chance to call
2764 * osc_io_unplug() and osc_check_rpcs() any
2765 * more. pdflush can't help in this case,
2766 * because it might be blocked at grabbing
2767 * the page lock as we mentioned.
2769 * Anyway, continue to drain pages. */
2778 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2779 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2780 &loi->loi_read_lop, pol);
2782 CERROR("Read request failed with %d\n", rc);
2790 /* attempt some inter-object balancing by issuing rpcs
2791 * for each object in turn */
2792 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2793 cfs_list_del_init(&loi->loi_hp_ready_item);
2794 if (!cfs_list_empty(&loi->loi_ready_item))
2795 cfs_list_del_init(&loi->loi_ready_item);
2796 if (!cfs_list_empty(&loi->loi_write_item))
2797 cfs_list_del_init(&loi->loi_write_item);
2798 if (!cfs_list_empty(&loi->loi_read_item))
2799 cfs_list_del_init(&loi->loi_read_item);
2801 loi_list_maint(cli, loi);
2803 /* send_oap_rpc fails with 0 when make_ready tells it to
2804 * back off. llite's make_ready does this when it tries
2805 * to lock a page queued for write that is already locked.
2806 * we want to try sending rpcs from many objects, but we
2807 * don't want to spin failing with 0. */
2808 if (race_counter == 10)
2813 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2815 osc_check_rpcs0(env, cli, 0);
2818 /* we're trying to queue a page in the osc so we're subject to the
2819 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2820 * If the osc's queued pages are already at that limit, then we want to sleep
2821 * until there is space in the osc's queue for us. We also may be waiting for
2822 * write credits from the OST if there are RPCs in flight that may return some
2823 * before we fall back to sync writes.
2825 * We need this know our allocation was granted in the presence of signals */
2826 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2830 client_obd_list_lock(&cli->cl_loi_list_lock);
2831 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2832 client_obd_list_unlock(&cli->cl_loi_list_lock);
2837 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2840 int osc_enter_cache_try(const struct lu_env *env,
2841 struct client_obd *cli, struct lov_oinfo *loi,
2842 struct osc_async_page *oap, int transient)
2846 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2848 osc_consume_write_grant(cli, &oap->oap_brw_page);
2850 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2851 cfs_atomic_inc(&obd_dirty_transit_pages);
2852 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2858 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2859 * grant or cache space. */
2860 static int osc_enter_cache(const struct lu_env *env,
2861 struct client_obd *cli, struct lov_oinfo *loi,
2862 struct osc_async_page *oap)
2864 struct osc_cache_waiter ocw;
2865 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2869 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2870 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2871 cli->cl_dirty_max, obd_max_dirty_pages,
2872 cli->cl_lost_grant, cli->cl_avail_grant);
2874 /* force the caller to try sync io. this can jump the list
2875 * of queued writes and create a discontiguous rpc stream */
2876 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2877 cli->cl_dirty_max < CFS_PAGE_SIZE ||
2878 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2881 /* Hopefully normal case - cache space and write credits available */
2882 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2883 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2884 osc_enter_cache_try(env, cli, loi, oap, 0))
2887 /* It is safe to block as a cache waiter as long as there is grant
2888 * space available or the hope of additional grant being returned
2889 * when an in flight write completes. Using the write back cache
2890 * if possible is preferable to sending the data synchronously
2891 * because write pages can then be merged in to large requests.
2892 * The addition of this cache waiter will causing pending write
2893 * pages to be sent immediately. */
2894 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2895 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2896 cfs_waitq_init(&ocw.ocw_waitq);
2900 loi_list_maint(cli, loi);
2901 osc_check_rpcs(env, cli);
2902 client_obd_list_unlock(&cli->cl_loi_list_lock);
2904 CDEBUG(D_CACHE, "sleeping for cache space\n");
2905 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2907 client_obd_list_lock(&cli->cl_loi_list_lock);
2908 if (!cfs_list_empty(&ocw.ocw_entry)) {
2909 cfs_list_del(&ocw.ocw_entry);
2919 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2920 struct lov_oinfo *loi, cfs_page_t *page,
2921 obd_off offset, const struct obd_async_page_ops *ops,
2922 void *data, void **res, int nocache,
2923 struct lustre_handle *lockh)
2925 struct osc_async_page *oap;
2930 return cfs_size_round(sizeof(*oap));
2933 oap->oap_magic = OAP_MAGIC;
2934 oap->oap_cli = &exp->exp_obd->u.cli;
2937 oap->oap_caller_ops = ops;
2938 oap->oap_caller_data = data;
2940 oap->oap_page = page;
2941 oap->oap_obj_off = offset;
2942 if (!client_is_remote(exp) &&
2943 cfs_capable(CFS_CAP_SYS_RESOURCE))
2944 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2946 LASSERT(!(offset & ~CFS_PAGE_MASK));
2948 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2949 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2950 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2951 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2953 cfs_spin_lock_init(&oap->oap_lock);
2954 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2958 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2959 struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2960 struct osc_async_page *oap, int cmd, int off,
2961 int count, obd_flag brw_flags, enum async_flags async_flags)
2963 struct client_obd *cli = &exp->exp_obd->u.cli;
2967 if (oap->oap_magic != OAP_MAGIC)
2970 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2973 if (!cfs_list_empty(&oap->oap_pending_item) ||
2974 !cfs_list_empty(&oap->oap_urgent_item) ||
2975 !cfs_list_empty(&oap->oap_rpc_item))
2978 /* check if the file's owner/group is over quota */
2979 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2980 struct cl_object *obj;
2981 struct cl_attr attr; /* XXX put attr into thread info */
2982 unsigned int qid[MAXQUOTAS];
2984 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2986 cl_object_attr_lock(obj);
2987 rc = cl_object_attr_get(env, obj, &attr);
2988 cl_object_attr_unlock(obj);
2990 qid[USRQUOTA] = attr.cat_uid;
2991 qid[GRPQUOTA] = attr.cat_gid;
2993 osc_quota_chkdq(cli, qid) == NO_QUOTA)
3000 loi = lsm->lsm_oinfo[0];
3002 client_obd_list_lock(&cli->cl_loi_list_lock);
3004 LASSERT(off + count <= CFS_PAGE_SIZE);
3006 oap->oap_page_off = off;
3007 oap->oap_count = count;
3008 oap->oap_brw_flags = brw_flags;
3009 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3010 if (cfs_memory_pressure_get())
3011 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3012 cfs_spin_lock(&oap->oap_lock);
3013 oap->oap_async_flags = async_flags;
3014 cfs_spin_unlock(&oap->oap_lock);
3016 if (cmd & OBD_BRW_WRITE) {
3017 rc = osc_enter_cache(env, cli, loi, oap);
3019 client_obd_list_unlock(&cli->cl_loi_list_lock);
3024 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3027 osc_oap_to_pending(oap);
3028 loi_list_maint(cli, loi);
3029 if (!osc_max_rpc_in_flight(cli, loi) &&
3030 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3031 LASSERT(cli->cl_writeback_work != NULL);
3032 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3034 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3037 client_obd_list_unlock(&cli->cl_loi_list_lock);
3042 /* aka (~was & now & flag), but this is more clear :) */
3043 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3045 int osc_set_async_flags_base(struct client_obd *cli,
3046 struct lov_oinfo *loi, struct osc_async_page *oap,
3047 obd_flag async_flags)
3049 struct loi_oap_pages *lop;
3053 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3055 if (oap->oap_cmd & OBD_BRW_WRITE) {
3056 lop = &loi->loi_write_lop;
3058 lop = &loi->loi_read_lop;
3061 if ((oap->oap_async_flags & async_flags) == async_flags)
3064 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3065 flags |= ASYNC_READY;
3067 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3068 cfs_list_empty(&oap->oap_rpc_item)) {
3069 if (oap->oap_async_flags & ASYNC_HP)
3070 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3072 cfs_list_add_tail(&oap->oap_urgent_item,
3074 flags |= ASYNC_URGENT;
3075 loi_list_maint(cli, loi);
3077 cfs_spin_lock(&oap->oap_lock);
3078 oap->oap_async_flags |= flags;
3079 cfs_spin_unlock(&oap->oap_lock);
3081 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3082 oap->oap_async_flags);
3086 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3087 struct lov_oinfo *loi, struct osc_async_page *oap)
3089 struct client_obd *cli = &exp->exp_obd->u.cli;
3090 struct loi_oap_pages *lop;
3094 if (oap->oap_magic != OAP_MAGIC)
3098 loi = lsm->lsm_oinfo[0];
3100 if (oap->oap_cmd & OBD_BRW_WRITE) {
3101 lop = &loi->loi_write_lop;
3103 lop = &loi->loi_read_lop;
3106 client_obd_list_lock(&cli->cl_loi_list_lock);
3108 if (!cfs_list_empty(&oap->oap_rpc_item))
3109 GOTO(out, rc = -EBUSY);
3111 osc_exit_cache(cli, oap, 0);
3112 osc_wake_cache_waiters(cli);
3114 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3115 cfs_list_del_init(&oap->oap_urgent_item);
3116 cfs_spin_lock(&oap->oap_lock);
3117 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3118 cfs_spin_unlock(&oap->oap_lock);
3120 if (!cfs_list_empty(&oap->oap_pending_item)) {
3121 cfs_list_del_init(&oap->oap_pending_item);
3122 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3124 loi_list_maint(cli, loi);
3125 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3127 client_obd_list_unlock(&cli->cl_loi_list_lock);
3131 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3132 struct ldlm_enqueue_info *einfo)
3134 void *data = einfo->ei_cbdata;
3137 LASSERT(lock != NULL);
3138 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3139 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3140 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3141 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3143 lock_res_and_lock(lock);
3144 cfs_spin_lock(&osc_ast_guard);
3146 if (lock->l_ast_data == NULL)
3147 lock->l_ast_data = data;
3148 if (lock->l_ast_data == data)
3151 cfs_spin_unlock(&osc_ast_guard);
3152 unlock_res_and_lock(lock);
3157 static int osc_set_data_with_check(struct lustre_handle *lockh,
3158 struct ldlm_enqueue_info *einfo)
3160 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3164 set = osc_set_lock_data_with_check(lock, einfo);
3165 LDLM_LOCK_PUT(lock);
3167 CERROR("lockh %p, data %p - client evicted?\n",
3168 lockh, einfo->ei_cbdata);
3172 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3173 ldlm_iterator_t replace, void *data)
3175 struct ldlm_res_id res_id;
3176 struct obd_device *obd = class_exp2obd(exp);
3178 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3179 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3183 /* find any ldlm lock of the inode in osc
3187 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3188 ldlm_iterator_t replace, void *data)
3190 struct ldlm_res_id res_id;
3191 struct obd_device *obd = class_exp2obd(exp);
3194 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3195 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3196 if (rc == LDLM_ITER_STOP)
3198 if (rc == LDLM_ITER_CONTINUE)
3203 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3204 obd_enqueue_update_f upcall, void *cookie,
3205 int *flags, int agl, int rc)
3207 int intent = *flags & LDLM_FL_HAS_INTENT;
3211 /* The request was created before ldlm_cli_enqueue call. */
3212 if (rc == ELDLM_LOCK_ABORTED) {
3213 struct ldlm_reply *rep;
3214 rep = req_capsule_server_get(&req->rq_pill,
3217 LASSERT(rep != NULL);
3218 if (rep->lock_policy_res1)
3219 rc = rep->lock_policy_res1;
3223 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3225 *flags |= LDLM_FL_LVB_READY;
3226 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3227 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3230 /* Call the update callback. */
3231 rc = (*upcall)(cookie, rc);
3235 static int osc_enqueue_interpret(const struct lu_env *env,
3236 struct ptlrpc_request *req,
3237 struct osc_enqueue_args *aa, int rc)
3239 struct ldlm_lock *lock;
3240 struct lustre_handle handle;
3242 struct ost_lvb *lvb;
3244 int *flags = aa->oa_flags;
3246 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3247 * might be freed anytime after lock upcall has been called. */
3248 lustre_handle_copy(&handle, aa->oa_lockh);
3249 mode = aa->oa_ei->ei_mode;
3251 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3253 lock = ldlm_handle2lock(&handle);
3255 /* Take an additional reference so that a blocking AST that
3256 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3257 * to arrive after an upcall has been executed by
3258 * osc_enqueue_fini(). */
3259 ldlm_lock_addref(&handle, mode);
3261 /* Let CP AST to grant the lock first. */
3262 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3264 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3269 lvb_len = sizeof(*aa->oa_lvb);
3272 /* Complete obtaining the lock procedure. */
3273 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3274 mode, flags, lvb, lvb_len, &handle, rc);
3275 /* Complete osc stuff. */
3276 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3277 flags, aa->oa_agl, rc);
3279 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3281 /* Release the lock for async request. */
3282 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3284 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3285 * not already released by
3286 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3288 ldlm_lock_decref(&handle, mode);
3290 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3291 aa->oa_lockh, req, aa);
3292 ldlm_lock_decref(&handle, mode);
3293 LDLM_LOCK_PUT(lock);
3297 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3298 struct lov_oinfo *loi, int flags,
3299 struct ost_lvb *lvb, __u32 mode, int rc)
3301 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3303 if (rc == ELDLM_OK) {
3306 LASSERT(lock != NULL);
3307 loi->loi_lvb = *lvb;
3308 tmp = loi->loi_lvb.lvb_size;
3309 /* Extend KMS up to the end of this lock and no further
3310 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3311 if (tmp > lock->l_policy_data.l_extent.end)
3312 tmp = lock->l_policy_data.l_extent.end + 1;
3313 if (tmp >= loi->loi_kms) {
3314 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3315 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3316 loi_kms_set(loi, tmp);
3318 LDLM_DEBUG(lock, "lock acquired, setting rss="
3319 LPU64"; leaving kms="LPU64", end="LPU64,
3320 loi->loi_lvb.lvb_size, loi->loi_kms,
3321 lock->l_policy_data.l_extent.end);
3323 ldlm_lock_allow_match(lock);
3324 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3325 LASSERT(lock != NULL);
3326 loi->loi_lvb = *lvb;
3327 ldlm_lock_allow_match(lock);
3328 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3329 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3335 ldlm_lock_fail_match(lock, rc);
3337 LDLM_LOCK_PUT(lock);
3340 EXPORT_SYMBOL(osc_update_enqueue);
3342 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3344 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3345 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3346 * other synchronous requests, however keeping some locks and trying to obtain
3347 * others may take a considerable amount of time in a case of ost failure; and
3348 * when other sync requests do not get released lock from a client, the client
3349 * is excluded from the cluster -- such scenarious make the life difficult, so
3350 * release locks just after they are obtained. */
3351 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3352 int *flags, ldlm_policy_data_t *policy,
3353 struct ost_lvb *lvb, int kms_valid,
3354 obd_enqueue_update_f upcall, void *cookie,
3355 struct ldlm_enqueue_info *einfo,
3356 struct lustre_handle *lockh,
3357 struct ptlrpc_request_set *rqset, int async, int agl)
3359 struct obd_device *obd = exp->exp_obd;
3360 struct ptlrpc_request *req = NULL;
3361 int intent = *flags & LDLM_FL_HAS_INTENT;
3362 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3367 /* Filesystem lock extents are extended to page boundaries so that
3368 * dealing with the page cache is a little smoother. */
3369 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3370 policy->l_extent.end |= ~CFS_PAGE_MASK;
3373 * kms is not valid when either object is completely fresh (so that no
3374 * locks are cached), or object was evicted. In the latter case cached
3375 * lock cannot be used, because it would prime inode state with
3376 * potentially stale LVB.
3381 /* Next, search for already existing extent locks that will cover us */
3382 /* If we're trying to read, we also search for an existing PW lock. The
3383 * VFS and page cache already protect us locally, so lots of readers/
3384 * writers can share a single PW lock.
3386 * There are problems with conversion deadlocks, so instead of
3387 * converting a read lock to a write lock, we'll just enqueue a new
3390 * At some point we should cancel the read lock instead of making them
3391 * send us a blocking callback, but there are problems with canceling
3392 * locks out from other users right now, too. */
3393 mode = einfo->ei_mode;
3394 if (einfo->ei_mode == LCK_PR)
3396 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3397 einfo->ei_type, policy, mode, lockh, 0);
3399 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3401 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3402 /* For AGL, if enqueue RPC is sent but the lock is not
3403 * granted, then skip to process this strpe.
3404 * Return -ECANCELED to tell the caller. */
3405 ldlm_lock_decref(lockh, mode);
3406 LDLM_LOCK_PUT(matched);
3408 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3409 *flags |= LDLM_FL_LVB_READY;
3410 /* addref the lock only if not async requests and PW
3411 * lock is matched whereas we asked for PR. */
3412 if (!rqset && einfo->ei_mode != mode)
3413 ldlm_lock_addref(lockh, LCK_PR);
3415 /* I would like to be able to ASSERT here that
3416 * rss <= kms, but I can't, for reasons which
3417 * are explained in lov_enqueue() */
3420 /* We already have a lock, and it's referenced */
3421 (*upcall)(cookie, ELDLM_OK);
3423 if (einfo->ei_mode != mode)
3424 ldlm_lock_decref(lockh, LCK_PW);
3426 /* For async requests, decref the lock. */
3427 ldlm_lock_decref(lockh, einfo->ei_mode);
3428 LDLM_LOCK_PUT(matched);
3431 ldlm_lock_decref(lockh, mode);
3432 LDLM_LOCK_PUT(matched);
3438 CFS_LIST_HEAD(cancels);
3439 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3440 &RQF_LDLM_ENQUEUE_LVB);
3444 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3446 ptlrpc_request_free(req);
3450 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3452 ptlrpc_request_set_replen(req);
3455 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3456 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3458 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3459 sizeof(*lvb), lockh, async);
3462 struct osc_enqueue_args *aa;
3463 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3464 aa = ptlrpc_req_async_args(req);
3467 aa->oa_flags = flags;
3468 aa->oa_upcall = upcall;
3469 aa->oa_cookie = cookie;
3471 aa->oa_lockh = lockh;
3474 req->rq_interpret_reply =
3475 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3476 if (rqset == PTLRPCD_SET)
3477 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3479 ptlrpc_set_add_req(rqset, req);
3480 } else if (intent) {
3481 ptlrpc_req_finished(req);
3486 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3488 ptlrpc_req_finished(req);
3493 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3494 struct ldlm_enqueue_info *einfo,
3495 struct ptlrpc_request_set *rqset)
3497 struct ldlm_res_id res_id;
3501 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3502 oinfo->oi_md->lsm_object_seq, &res_id);
3504 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3505 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3506 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3507 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3508 rqset, rqset != NULL, 0);
3512 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3513 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3514 int *flags, void *data, struct lustre_handle *lockh,
3517 struct obd_device *obd = exp->exp_obd;
3518 int lflags = *flags;
3522 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3525 /* Filesystem lock extents are extended to page boundaries so that
3526 * dealing with the page cache is a little smoother */
3527 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3528 policy->l_extent.end |= ~CFS_PAGE_MASK;
3530 /* Next, search for already existing extent locks that will cover us */
3531 /* If we're trying to read, we also search for an existing PW lock. The
3532 * VFS and page cache already protect us locally, so lots of readers/
3533 * writers can share a single PW lock. */
3537 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3538 res_id, type, policy, rc, lockh, unref);
3541 if (!osc_set_data_with_check(lockh, data)) {
3542 if (!(lflags & LDLM_FL_TEST_LOCK))
3543 ldlm_lock_decref(lockh, rc);
3547 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3548 ldlm_lock_addref(lockh, LCK_PR);
3549 ldlm_lock_decref(lockh, LCK_PW);
3556 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3560 if (unlikely(mode == LCK_GROUP))
3561 ldlm_lock_decref_and_cancel(lockh, mode);
3563 ldlm_lock_decref(lockh, mode);
3568 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3569 __u32 mode, struct lustre_handle *lockh)
3572 RETURN(osc_cancel_base(lockh, mode));
3575 static int osc_cancel_unused(struct obd_export *exp,
3576 struct lov_stripe_md *lsm,
3577 ldlm_cancel_flags_t flags,
3580 struct obd_device *obd = class_exp2obd(exp);
3581 struct ldlm_res_id res_id, *resp = NULL;
3584 resp = osc_build_res_name(lsm->lsm_object_id,
3585 lsm->lsm_object_seq, &res_id);
3588 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3591 static int osc_statfs_interpret(const struct lu_env *env,
3592 struct ptlrpc_request *req,
3593 struct osc_async_args *aa, int rc)
3595 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3596 struct obd_statfs *msfs;
3601 /* The request has in fact never been sent
3602 * due to issues at a higher level (LOV).
3603 * Exit immediately since the caller is
3604 * aware of the problem and takes care
3605 * of the clean up */
3608 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3609 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3615 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3617 GOTO(out, rc = -EPROTO);
3620 /* Reinitialize the RDONLY and DEGRADED flags at the client
3621 * on each statfs, so they don't stay set permanently. */
3622 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3624 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3625 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3626 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3627 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3629 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3630 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3631 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3632 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3634 /* Add a bit of hysteresis so this flag isn't continually flapping,
3635 * and ensure that new files don't get extremely fragmented due to
3636 * only a small amount of available space in the filesystem.
3637 * We want to set the NOSPC flag when there is less than ~0.1% free
3638 * and clear it when there is at least ~0.2% free space, so:
3639 * avail < ~0.1% max max = avail + used
3640 * 1025 * avail < avail + used used = blocks - free
3641 * 1024 * avail < used
3642 * 1024 * avail < blocks - free
3643 * avail < ((blocks - free) >> 10)
3645 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3646 * lose that amount of space so in those cases we report no space left
3647 * if their is less than 1 GB left. */
3648 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3649 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3650 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3651 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3652 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3653 (msfs->os_ffree > 64) &&
3654 (msfs->os_bavail > (used << 1)))) {
3655 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3656 OSCC_FLAG_NOSPC_BLK);
3659 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3660 (msfs->os_bavail < used)))
3661 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3663 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3665 *aa->aa_oi->oi_osfs = *msfs;
3667 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3671 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3672 __u64 max_age, struct ptlrpc_request_set *rqset)
3674 struct ptlrpc_request *req;
3675 struct osc_async_args *aa;
3679 /* We could possibly pass max_age in the request (as an absolute
3680 * timestamp or a "seconds.usec ago") so the target can avoid doing
3681 * extra calls into the filesystem if that isn't necessary (e.g.
3682 * during mount that would help a bit). Having relative timestamps
3683 * is not so great if request processing is slow, while absolute
3684 * timestamps are not ideal because they need time synchronization. */
3685 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3689 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3691 ptlrpc_request_free(req);
3694 ptlrpc_request_set_replen(req);
3695 req->rq_request_portal = OST_CREATE_PORTAL;
3696 ptlrpc_at_set_req_timeout(req);
3698 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3699 /* procfs requests not want stat in wait for avoid deadlock */
3700 req->rq_no_resend = 1;
3701 req->rq_no_delay = 1;
3704 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3705 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3706 aa = ptlrpc_req_async_args(req);
3709 ptlrpc_set_add_req(rqset, req);
3713 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3714 __u64 max_age, __u32 flags)
3716 struct obd_statfs *msfs;
3717 struct ptlrpc_request *req;
3718 struct obd_import *imp = NULL;
3722 /*Since the request might also come from lprocfs, so we need
3723 *sync this with client_disconnect_export Bug15684*/
3724 cfs_down_read(&obd->u.cli.cl_sem);
3725 if (obd->u.cli.cl_import)
3726 imp = class_import_get(obd->u.cli.cl_import);
3727 cfs_up_read(&obd->u.cli.cl_sem);
3731 /* We could possibly pass max_age in the request (as an absolute
3732 * timestamp or a "seconds.usec ago") so the target can avoid doing
3733 * extra calls into the filesystem if that isn't necessary (e.g.
3734 * during mount that would help a bit). Having relative timestamps
3735 * is not so great if request processing is slow, while absolute
3736 * timestamps are not ideal because they need time synchronization. */
3737 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3739 class_import_put(imp);
3744 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3746 ptlrpc_request_free(req);
3749 ptlrpc_request_set_replen(req);
3750 req->rq_request_portal = OST_CREATE_PORTAL;
3751 ptlrpc_at_set_req_timeout(req);
3753 if (flags & OBD_STATFS_NODELAY) {
3754 /* procfs requests not want stat in wait for avoid deadlock */
3755 req->rq_no_resend = 1;
3756 req->rq_no_delay = 1;
3759 rc = ptlrpc_queue_wait(req);
3763 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3765 GOTO(out, rc = -EPROTO);
3772 ptlrpc_req_finished(req);
3776 /* Retrieve object striping information.
3778 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3779 * the maximum number of OST indices which will fit in the user buffer.
3780 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3782 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3784 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3785 struct lov_user_md_v3 lum, *lumk;
3786 struct lov_user_ost_data_v1 *lmm_objects;
3787 int rc = 0, lum_size;
3793 /* we only need the header part from user space to get lmm_magic and
3794 * lmm_stripe_count, (the header part is common to v1 and v3) */
3795 lum_size = sizeof(struct lov_user_md_v1);
3796 if (cfs_copy_from_user(&lum, lump, lum_size))
3799 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3800 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3803 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3804 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3805 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3806 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3808 /* we can use lov_mds_md_size() to compute lum_size
3809 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3810 if (lum.lmm_stripe_count > 0) {
3811 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3812 OBD_ALLOC(lumk, lum_size);
3816 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3817 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3819 lmm_objects = &(lumk->lmm_objects[0]);
3820 lmm_objects->l_object_id = lsm->lsm_object_id;
3822 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3826 lumk->lmm_object_id = lsm->lsm_object_id;
3827 lumk->lmm_object_seq = lsm->lsm_object_seq;
3828 lumk->lmm_stripe_count = 1;
3830 if (cfs_copy_to_user(lump, lumk, lum_size))
3834 OBD_FREE(lumk, lum_size);
3840 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3841 void *karg, void *uarg)
3843 struct obd_device *obd = exp->exp_obd;
3844 struct obd_ioctl_data *data = karg;
3848 if (!cfs_try_module_get(THIS_MODULE)) {
3849 CERROR("Can't get module. Is it alive?");
3853 case OBD_IOC_LOV_GET_CONFIG: {
3855 struct lov_desc *desc;
3856 struct obd_uuid uuid;
3860 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3861 GOTO(out, err = -EINVAL);
3863 data = (struct obd_ioctl_data *)buf;
3865 if (sizeof(*desc) > data->ioc_inllen1) {
3866 obd_ioctl_freedata(buf, len);
3867 GOTO(out, err = -EINVAL);
3870 if (data->ioc_inllen2 < sizeof(uuid)) {
3871 obd_ioctl_freedata(buf, len);
3872 GOTO(out, err = -EINVAL);
3875 desc = (struct lov_desc *)data->ioc_inlbuf1;
3876 desc->ld_tgt_count = 1;
3877 desc->ld_active_tgt_count = 1;
3878 desc->ld_default_stripe_count = 1;
3879 desc->ld_default_stripe_size = 0;
3880 desc->ld_default_stripe_offset = 0;
3881 desc->ld_pattern = 0;
3882 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3884 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3886 err = cfs_copy_to_user((void *)uarg, buf, len);
3889 obd_ioctl_freedata(buf, len);
3892 case LL_IOC_LOV_SETSTRIPE:
3893 err = obd_alloc_memmd(exp, karg);
3897 case LL_IOC_LOV_GETSTRIPE:
3898 err = osc_getstripe(karg, uarg);
3900 case OBD_IOC_CLIENT_RECOVER:
3901 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3902 data->ioc_inlbuf1, 0);
3906 case IOC_OSC_SET_ACTIVE:
3907 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3910 case OBD_IOC_POLL_QUOTACHECK:
3911 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3913 case OBD_IOC_PING_TARGET:
3914 err = ptlrpc_obd_ping(obd);
3917 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3918 cmd, cfs_curproc_comm());
3919 GOTO(out, err = -ENOTTY);
3922 cfs_module_put(THIS_MODULE);
3926 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3927 void *key, __u32 *vallen, void *val,
3928 struct lov_stripe_md *lsm)
3931 if (!vallen || !val)
3934 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3935 __u32 *stripe = val;
3936 *vallen = sizeof(*stripe);
3939 } else if (KEY_IS(KEY_LAST_ID)) {
3940 struct ptlrpc_request *req;
3945 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3946 &RQF_OST_GET_INFO_LAST_ID);
3950 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3951 RCL_CLIENT, keylen);
3952 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3954 ptlrpc_request_free(req);
3958 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3959 memcpy(tmp, key, keylen);
3961 req->rq_no_delay = req->rq_no_resend = 1;
3962 ptlrpc_request_set_replen(req);
3963 rc = ptlrpc_queue_wait(req);
3967 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3969 GOTO(out, rc = -EPROTO);
3971 *((obd_id *)val) = *reply;
3973 ptlrpc_req_finished(req);
3975 } else if (KEY_IS(KEY_FIEMAP)) {
3976 struct ptlrpc_request *req;
3977 struct ll_user_fiemap *reply;
3981 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3982 &RQF_OST_GET_INFO_FIEMAP);
3986 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3987 RCL_CLIENT, keylen);
3988 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3989 RCL_CLIENT, *vallen);
3990 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3991 RCL_SERVER, *vallen);
3993 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3995 ptlrpc_request_free(req);
3999 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4000 memcpy(tmp, key, keylen);
4001 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4002 memcpy(tmp, val, *vallen);
4004 ptlrpc_request_set_replen(req);
4005 rc = ptlrpc_queue_wait(req);
4009 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4011 GOTO(out1, rc = -EPROTO);
4013 memcpy(val, reply, *vallen);
4015 ptlrpc_req_finished(req);
4023 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4025 struct llog_ctxt *ctxt;
4029 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4031 rc = llog_initiator_connect(ctxt);
4032 llog_ctxt_put(ctxt);
4034 /* XXX return an error? skip setting below flags? */
4037 cfs_spin_lock(&imp->imp_lock);
4038 imp->imp_server_timeout = 1;
4039 imp->imp_pingable = 1;
4040 cfs_spin_unlock(&imp->imp_lock);
4041 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4046 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4047 struct ptlrpc_request *req,
4054 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4057 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4058 void *key, obd_count vallen, void *val,
4059 struct ptlrpc_request_set *set)
4061 struct ptlrpc_request *req;
4062 struct obd_device *obd = exp->exp_obd;
4063 struct obd_import *imp = class_exp2cliimp(exp);
4068 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4070 if (KEY_IS(KEY_NEXT_ID)) {
4072 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4074 if (vallen != sizeof(obd_id))
4079 if (vallen != sizeof(obd_id))
4082 /* avoid race between allocate new object and set next id
4083 * from ll_sync thread */
4084 cfs_spin_lock(&oscc->oscc_lock);
4085 new_val = *((obd_id*)val) + 1;
4086 if (new_val > oscc->oscc_next_id)
4087 oscc->oscc_next_id = new_val;
4088 cfs_spin_unlock(&oscc->oscc_lock);
4089 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4090 exp->exp_obd->obd_name,
4091 obd->u.cli.cl_oscc.oscc_next_id);
4096 if (KEY_IS(KEY_CHECKSUM)) {
4097 if (vallen != sizeof(int))
4099 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4103 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4104 sptlrpc_conf_client_adapt(obd);
4108 if (KEY_IS(KEY_FLUSH_CTX)) {
4109 sptlrpc_import_flush_my_ctx(imp);
4113 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4116 /* We pass all other commands directly to OST. Since nobody calls osc
4117 methods directly and everybody is supposed to go through LOV, we
4118 assume lov checked invalid values for us.
4119 The only recognised values so far are evict_by_nid and mds_conn.
4120 Even if something bad goes through, we'd get a -EINVAL from OST
4123 if (KEY_IS(KEY_GRANT_SHRINK))
4124 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4126 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4131 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4132 RCL_CLIENT, keylen);
4133 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4134 RCL_CLIENT, vallen);
4135 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4137 ptlrpc_request_free(req);
4141 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4142 memcpy(tmp, key, keylen);
4143 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4144 memcpy(tmp, val, vallen);
4146 if (KEY_IS(KEY_MDS_CONN)) {
4147 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4149 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4150 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4151 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4152 req->rq_no_delay = req->rq_no_resend = 1;
4153 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4154 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4155 struct osc_grant_args *aa;
4158 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4159 aa = ptlrpc_req_async_args(req);
4162 ptlrpc_req_finished(req);
4165 *oa = ((struct ost_body *)val)->oa;
4167 req->rq_interpret_reply = osc_shrink_grant_interpret;
4170 ptlrpc_request_set_replen(req);
4171 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4172 LASSERT(set != NULL);
4173 ptlrpc_set_add_req(set, req);
4174 ptlrpc_check_set(NULL, set);
4176 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4182 static struct llog_operations osc_size_repl_logops = {
4183 lop_cancel: llog_obd_repl_cancel
4186 static struct llog_operations osc_mds_ost_orig_logops;
4188 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4189 struct obd_device *tgt, struct llog_catid *catid)
4194 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4195 &catid->lci_logid, &osc_mds_ost_orig_logops);
4197 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4201 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4202 NULL, &osc_size_repl_logops);
4204 struct llog_ctxt *ctxt =
4205 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4208 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4213 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4214 obd->obd_name, tgt->obd_name, catid, rc);
4215 CERROR("logid "LPX64":0x%x\n",
4216 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4221 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4222 struct obd_device *disk_obd, int *index)
4224 struct llog_catid catid;
4225 static char name[32] = CATLIST;
4229 LASSERT(olg == &obd->obd_olg);
4231 cfs_mutex_down(&olg->olg_cat_processing);
4232 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4234 CERROR("rc: %d\n", rc);
4238 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4239 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4240 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4242 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4244 CERROR("rc: %d\n", rc);
4248 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4250 CERROR("rc: %d\n", rc);
4255 cfs_mutex_up(&olg->olg_cat_processing);
4260 static int osc_llog_finish(struct obd_device *obd, int count)
4262 struct llog_ctxt *ctxt;
4263 int rc = 0, rc2 = 0;
4266 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4268 rc = llog_cleanup(ctxt);
4270 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4272 rc2 = llog_cleanup(ctxt);
4279 static int osc_reconnect(const struct lu_env *env,
4280 struct obd_export *exp, struct obd_device *obd,
4281 struct obd_uuid *cluuid,
4282 struct obd_connect_data *data,
4285 struct client_obd *cli = &obd->u.cli;
4287 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4290 client_obd_list_lock(&cli->cl_loi_list_lock);
4291 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4292 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4293 lost_grant = cli->cl_lost_grant;
4294 cli->cl_lost_grant = 0;
4295 client_obd_list_unlock(&cli->cl_loi_list_lock);
4297 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4298 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4299 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4300 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4301 " ocd_grant: %d\n", data->ocd_connect_flags,
4302 data->ocd_version, data->ocd_grant);
4308 static int osc_disconnect(struct obd_export *exp)
4310 struct obd_device *obd = class_exp2obd(exp);
4311 struct llog_ctxt *ctxt;
4314 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4316 if (obd->u.cli.cl_conn_count == 1) {
4317 /* Flush any remaining cancel messages out to the
4319 llog_sync(ctxt, exp);
4321 llog_ctxt_put(ctxt);
4323 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4327 rc = client_disconnect_export(exp);
4329 * Initially we put del_shrink_grant before disconnect_export, but it
4330 * causes the following problem if setup (connect) and cleanup
4331 * (disconnect) are tangled together.
4332 * connect p1 disconnect p2
4333 * ptlrpc_connect_import
4334 * ............... class_manual_cleanup
4337 * ptlrpc_connect_interrupt
4339 * add this client to shrink list
4341 * Bang! pinger trigger the shrink.
4342 * So the osc should be disconnected from the shrink list, after we
4343 * are sure the import has been destroyed. BUG18662
4345 if (obd->u.cli.cl_import == NULL)
4346 osc_del_shrink_grant(&obd->u.cli);
4350 static int osc_import_event(struct obd_device *obd,
4351 struct obd_import *imp,
4352 enum obd_import_event event)
4354 struct client_obd *cli;
4358 LASSERT(imp->imp_obd == obd);
4361 case IMP_EVENT_DISCON: {
4362 /* Only do this on the MDS OSC's */
4363 if (imp->imp_server_timeout) {
4364 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4366 cfs_spin_lock(&oscc->oscc_lock);
4367 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4368 cfs_spin_unlock(&oscc->oscc_lock);
4371 client_obd_list_lock(&cli->cl_loi_list_lock);
4372 cli->cl_avail_grant = 0;
4373 cli->cl_lost_grant = 0;
4374 client_obd_list_unlock(&cli->cl_loi_list_lock);
4377 case IMP_EVENT_INACTIVE: {
4378 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4381 case IMP_EVENT_INVALIDATE: {
4382 struct ldlm_namespace *ns = obd->obd_namespace;
4386 env = cl_env_get(&refcheck);
4390 client_obd_list_lock(&cli->cl_loi_list_lock);
4391 /* all pages go to failing rpcs due to the invalid
4393 osc_check_rpcs(env, cli);
4394 client_obd_list_unlock(&cli->cl_loi_list_lock);
4396 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4397 cl_env_put(env, &refcheck);
4402 case IMP_EVENT_ACTIVE: {
4403 /* Only do this on the MDS OSC's */
4404 if (imp->imp_server_timeout) {
4405 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4407 cfs_spin_lock(&oscc->oscc_lock);
4408 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4409 OSCC_FLAG_NOSPC_BLK);
4410 cfs_spin_unlock(&oscc->oscc_lock);
4412 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4415 case IMP_EVENT_OCD: {
4416 struct obd_connect_data *ocd = &imp->imp_connect_data;
4418 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4419 osc_init_grant(&obd->u.cli, ocd);
4422 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4423 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4425 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4428 case IMP_EVENT_DEACTIVATE: {
4429 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4432 case IMP_EVENT_ACTIVATE: {
4433 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4437 CERROR("Unknown import event %d\n", event);
4444 * Determine whether the lock can be canceled before replaying the lock
4445 * during recovery, see bug16774 for detailed information.
4447 * \retval zero the lock can't be canceled
4448 * \retval other ok to cancel
4450 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4452 check_res_locked(lock->l_resource);
4455 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4457 * XXX as a future improvement, we can also cancel unused write lock
4458 * if it doesn't have dirty data and active mmaps.
4460 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4461 (lock->l_granted_mode == LCK_PR ||
4462 lock->l_granted_mode == LCK_CR) &&
4463 (osc_dlm_lock_pageref(lock) == 0))
4469 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4471 struct client_obd *cli = &obd->u.cli;
4476 rc = ptlrpcd_addref();
4480 rc = client_obd_setup(obd, lcfg);
4483 handler = ptlrpcd_alloc_work(cli->cl_import,
4484 brw_queue_work, cli);
4485 if (!IS_ERR(handler))
4486 cli->cl_writeback_work = handler;
4488 rc = PTR_ERR(handler);
4492 struct lprocfs_static_vars lvars = { 0 };
4494 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4495 lprocfs_osc_init_vars(&lvars);
4496 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4497 lproc_osc_attach_seqstat(obd);
4498 sptlrpc_lprocfs_cliobd_attach(obd);
4499 ptlrpc_lprocfs_register_obd(obd);
4503 /* We need to allocate a few requests more, because
4504 brw_interpret tries to create new requests before freeing
4505 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4506 reserved, but I afraid that might be too much wasted RAM
4507 in fact, so 2 is just my guess and still should work. */
4508 cli->cl_import->imp_rq_pool =
4509 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4511 ptlrpc_add_rqs_to_pool);
4513 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4514 cfs_sema_init(&cli->cl_grant_sem, 1);
4516 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4524 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4530 case OBD_CLEANUP_EARLY: {
4531 struct obd_import *imp;
4532 imp = obd->u.cli.cl_import;
4533 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4534 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4535 ptlrpc_deactivate_import(imp);
4536 cfs_spin_lock(&imp->imp_lock);
4537 imp->imp_pingable = 0;
4538 cfs_spin_unlock(&imp->imp_lock);
4541 case OBD_CLEANUP_EXPORTS: {
4542 struct client_obd *cli = &obd->u.cli;
4544 * for echo client, export may be on zombie list, wait for
4545 * zombie thread to cull it, because cli.cl_import will be
4546 * cleared in client_disconnect_export():
4547 * class_export_destroy() -> obd_cleanup() ->
4548 * echo_device_free() -> echo_client_cleanup() ->
4549 * obd_disconnect() -> osc_disconnect() ->
4550 * client_disconnect_export()
4552 obd_zombie_barrier();
4553 if (cli->cl_writeback_work) {
4554 ptlrpcd_destroy_work(cli->cl_writeback_work);
4555 cli->cl_writeback_work = NULL;
4557 obd_cleanup_client_import(obd);
4558 ptlrpc_lprocfs_unregister_obd(obd);
4559 lprocfs_obd_cleanup(obd);
4560 rc = obd_llog_finish(obd, 0);
4562 CERROR("failed to cleanup llogging subsystems\n");
4569 int osc_cleanup(struct obd_device *obd)
4575 /* free memory of osc quota cache */
4576 osc_quota_cleanup(obd);
4578 rc = client_obd_cleanup(obd);
4584 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4586 struct lprocfs_static_vars lvars = { 0 };
4589 lprocfs_osc_init_vars(&lvars);
4591 switch (lcfg->lcfg_command) {
4593 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4603 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4605 return osc_process_config_base(obd, buf);
4608 struct obd_ops osc_obd_ops = {
4609 .o_owner = THIS_MODULE,
4610 .o_setup = osc_setup,
4611 .o_precleanup = osc_precleanup,
4612 .o_cleanup = osc_cleanup,
4613 .o_add_conn = client_import_add_conn,
4614 .o_del_conn = client_import_del_conn,
4615 .o_connect = client_connect_import,
4616 .o_reconnect = osc_reconnect,
4617 .o_disconnect = osc_disconnect,
4618 .o_statfs = osc_statfs,
4619 .o_statfs_async = osc_statfs_async,
4620 .o_packmd = osc_packmd,
4621 .o_unpackmd = osc_unpackmd,
4622 .o_precreate = osc_precreate,
4623 .o_create = osc_create,
4624 .o_create_async = osc_create_async,
4625 .o_destroy = osc_destroy,
4626 .o_getattr = osc_getattr,
4627 .o_getattr_async = osc_getattr_async,
4628 .o_setattr = osc_setattr,
4629 .o_setattr_async = osc_setattr_async,
4631 .o_punch = osc_punch,
4633 .o_enqueue = osc_enqueue,
4634 .o_change_cbdata = osc_change_cbdata,
4635 .o_find_cbdata = osc_find_cbdata,
4636 .o_cancel = osc_cancel,
4637 .o_cancel_unused = osc_cancel_unused,
4638 .o_iocontrol = osc_iocontrol,
4639 .o_get_info = osc_get_info,
4640 .o_set_info_async = osc_set_info_async,
4641 .o_import_event = osc_import_event,
4642 .o_llog_init = osc_llog_init,
4643 .o_llog_finish = osc_llog_finish,
4644 .o_process_config = osc_process_config,
4645 .o_quotactl = osc_quotactl,
4646 .o_quotacheck = osc_quotacheck,
4647 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
4650 extern struct lu_kmem_descr osc_caches[];
4651 extern cfs_spinlock_t osc_ast_guard;
4652 extern cfs_lock_class_key_t osc_ast_guard_class;
4654 int __init osc_init(void)
4656 struct lprocfs_static_vars lvars = { 0 };
4660 /* print an address of _any_ initialized kernel symbol from this
4661 * module, to allow debugging with gdb that doesn't support data
4662 * symbols from modules.*/
4663 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4665 rc = lu_kmem_init(osc_caches);
4667 lprocfs_osc_init_vars(&lvars);
4670 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4671 LUSTRE_OSC_NAME, &osc_device_type);
4673 lu_kmem_fini(osc_caches);
4677 cfs_spin_lock_init(&osc_ast_guard);
4678 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4680 osc_mds_ost_orig_logops = llog_lvfs_ops;
4681 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4682 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4683 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4684 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4690 static void /*__exit*/ osc_exit(void)
4692 lu_device_type_fini(&osc_device_type);
4695 class_unregister_type(LUSTRE_OSC_NAME);
4696 lu_kmem_fini(osc_caches);
4699 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4700 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4701 MODULE_LICENSE("GPL");
4703 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);