1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
41 # define EXPORT_SYMTAB
43 #define DEBUG_SUBSYSTEM S_OSC
45 #include <libcfs/libcfs.h>
48 # include <liblustre.h>
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
74 int osc_cleanup(struct obd_device *obd);
76 /* Pack OSC object metadata for disk storage (LE byte order). */
77 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
78 struct lov_stripe_md *lsm)
83 lmm_size = sizeof(**lmmp);
88 OBD_FREE(*lmmp, lmm_size);
94 OBD_ALLOC(*lmmp, lmm_size);
100 LASSERT(lsm->lsm_object_id);
101 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
102 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
103 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
109 /* Unpack OSC object metadata from disk storage (LE byte order). */
110 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
111 struct lov_mds_md *lmm, int lmm_bytes)
114 struct obd_import *imp = class_exp2cliimp(exp);
118 if (lmm_bytes < sizeof (*lmm)) {
119 CERROR("lov_mds_md too small: %d, need %d\n",
120 lmm_bytes, (int)sizeof(*lmm));
123 /* XXX LOV_MAGIC etc check? */
125 if (lmm->lmm_object_id == 0) {
126 CERROR("lov_mds_md: zero lmm_object_id\n");
131 lsm_size = lov_stripe_md_size(1);
135 if (*lsmp != NULL && lmm == NULL) {
136 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137 OBD_FREE(*lsmp, lsm_size);
143 OBD_ALLOC(*lsmp, lsm_size);
146 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
147 if ((*lsmp)->lsm_oinfo[0] == NULL) {
148 OBD_FREE(*lsmp, lsm_size);
151 loi_init((*lsmp)->lsm_oinfo[0]);
155 /* XXX zero *lsmp? */
156 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
157 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
158 LASSERT((*lsmp)->lsm_object_id);
159 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
163 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
164 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
166 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
171 static inline void osc_pack_capa(struct ptlrpc_request *req,
172 struct ost_body *body, void *capa)
174 struct obd_capa *oc = (struct obd_capa *)capa;
175 struct lustre_capa *c;
180 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
183 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
184 DEBUG_CAPA(D_SEC, c, "pack");
187 static inline void osc_pack_req_body(struct ptlrpc_request *req,
188 struct obd_info *oinfo)
190 struct ost_body *body;
192 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
195 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
196 osc_pack_capa(req, body, oinfo->oi_capa);
199 static inline void osc_set_capa_size(struct ptlrpc_request *req,
200 const struct req_msg_field *field,
204 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
206 /* it is already calculated as sizeof struct obd_capa */
210 static int osc_getattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_async_args *aa, int rc)
214 struct ost_body *body;
220 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
223 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
225 /* This should really be sent by the OST */
226 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
227 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
229 CDEBUG(D_INFO, "can't unpack ost_body\n");
231 aa->aa_oi->oi_oa->o_valid = 0;
234 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
238 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
239 struct ptlrpc_request_set *set)
241 struct ptlrpc_request *req;
242 struct osc_async_args *aa;
246 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
250 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
251 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253 ptlrpc_request_free(req);
257 osc_pack_req_body(req, oinfo);
259 ptlrpc_request_set_replen(req);
260 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
263 aa = ptlrpc_req_async_args(req);
266 ptlrpc_set_add_req(set, req);
270 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
272 struct ptlrpc_request *req;
273 struct ost_body *body;
277 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
281 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
282 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284 ptlrpc_request_free(req);
288 osc_pack_req_body(req, oinfo);
290 ptlrpc_request_set_replen(req);
292 rc = ptlrpc_queue_wait(req);
296 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
298 GOTO(out, rc = -EPROTO);
300 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
301 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
303 /* This should really be sent by the OST */
304 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
305 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
309 ptlrpc_req_finished(req);
313 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
314 struct obd_trans_info *oti)
316 struct ptlrpc_request *req;
317 struct ost_body *body;
321 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
323 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
327 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
328 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
330 ptlrpc_request_free(req);
334 osc_pack_req_body(req, oinfo);
336 ptlrpc_request_set_replen(req);
338 rc = ptlrpc_queue_wait(req);
342 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344 GOTO(out, rc = -EPROTO);
346 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
350 ptlrpc_req_finished(req);
354 static int osc_setattr_interpret(const struct lu_env *env,
355 struct ptlrpc_request *req,
356 struct osc_setattr_args *sa, int rc)
358 struct ost_body *body;
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366 GOTO(out, rc = -EPROTO);
368 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
370 rc = sa->sa_upcall(sa->sa_cookie, rc);
374 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
375 struct obd_trans_info *oti,
376 obd_enqueue_update_f upcall, void *cookie,
377 struct ptlrpc_request_set *rqset)
379 struct ptlrpc_request *req;
380 struct osc_setattr_args *sa;
384 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
388 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
389 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391 ptlrpc_request_free(req);
395 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
396 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398 osc_pack_req_body(req, oinfo);
400 ptlrpc_request_set_replen(req);
402 /* do mds to ost setattr asynchronously */
404 /* Do not wait for response. */
405 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
407 req->rq_interpret_reply =
408 (ptlrpc_interpterer_t)osc_setattr_interpret;
410 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
411 sa = ptlrpc_req_async_args(req);
412 sa->sa_oa = oinfo->oi_oa;
413 sa->sa_upcall = upcall;
414 sa->sa_cookie = cookie;
416 if (rqset == PTLRPCD_SET)
417 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
419 ptlrpc_set_add_req(rqset, req);
425 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
426 struct obd_trans_info *oti,
427 struct ptlrpc_request_set *rqset)
429 return osc_setattr_async_base(exp, oinfo, oti,
430 oinfo->oi_cb_up, oinfo, rqset);
433 int osc_real_create(struct obd_export *exp, struct obdo *oa,
434 struct lov_stripe_md **ea, struct obd_trans_info *oti)
436 struct ptlrpc_request *req;
437 struct ost_body *body;
438 struct lov_stripe_md *lsm;
447 rc = obd_alloc_memmd(exp, &lsm);
452 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
454 GOTO(out, rc = -ENOMEM);
456 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
458 ptlrpc_request_free(req);
462 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464 lustre_set_wire_obdo(&body->oa, oa);
466 ptlrpc_request_set_replen(req);
468 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469 oa->o_flags == OBD_FL_DELORPHAN) {
471 "delorphan from OST integration");
472 /* Don't resend the delorphan req */
473 req->rq_no_resend = req->rq_no_delay = 1;
476 rc = ptlrpc_queue_wait(req);
480 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
482 GOTO(out_req, rc = -EPROTO);
484 lustre_get_wire_obdo(oa, &body->oa);
486 /* This should really be sent by the OST */
487 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
488 oa->o_valid |= OBD_MD_FLBLKSZ;
490 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491 * have valid lsm_oinfo data structs, so don't go touching that.
492 * This needs to be fixed in a big way.
494 lsm->lsm_object_id = oa->o_id;
495 lsm->lsm_object_seq = oa->o_seq;
499 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
501 if (oa->o_valid & OBD_MD_FLCOOKIE) {
502 if (!oti->oti_logcookies)
503 oti_alloc_cookies(oti, 1);
504 *oti->oti_logcookies = oa->o_lcookie;
508 CDEBUG(D_HA, "transno: "LPD64"\n",
509 lustre_msg_get_transno(req->rq_repmsg));
511 ptlrpc_req_finished(req);
514 obd_free_memmd(exp, &lsm);
518 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
519 obd_enqueue_update_f upcall, void *cookie,
520 struct ptlrpc_request_set *rqset)
522 struct ptlrpc_request *req;
523 struct osc_setattr_args *sa;
524 struct ost_body *body;
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
533 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535 ptlrpc_request_free(req);
538 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req);
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
544 osc_pack_capa(req, body, oinfo->oi_capa);
546 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
550 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
551 sa = ptlrpc_req_async_args(req);
552 sa->sa_oa = oinfo->oi_oa;
553 sa->sa_upcall = upcall;
554 sa->sa_cookie = cookie;
555 if (rqset == PTLRPCD_SET)
556 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
558 ptlrpc_set_add_req(rqset, req);
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564 struct obd_trans_info *oti,
565 struct ptlrpc_request_set *rqset)
567 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
568 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570 return osc_punch_base(exp, oinfo,
571 oinfo->oi_cb_up, oinfo, rqset);
574 static int osc_sync_interpret(const struct lu_env *env,
575 struct ptlrpc_request *req,
578 struct osc_async_args *aa = arg;
579 struct ost_body *body;
585 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
587 CERROR ("can't unpack ost_body\n");
588 GOTO(out, rc = -EPROTO);
591 *aa->aa_oi->oi_oa = body->oa;
593 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
597 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
598 obd_size start, obd_size end,
599 struct ptlrpc_request_set *set)
601 struct ptlrpc_request *req;
602 struct ost_body *body;
603 struct osc_async_args *aa;
608 CDEBUG(D_INFO, "oa NULL\n");
612 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
616 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
617 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
619 ptlrpc_request_free(req);
623 /* overload the size and blocks fields in the oa with start/end */
624 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
626 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
627 body->oa.o_size = start;
628 body->oa.o_blocks = end;
629 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
630 osc_pack_capa(req, body, oinfo->oi_capa);
632 ptlrpc_request_set_replen(req);
633 req->rq_interpret_reply = osc_sync_interpret;
635 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
636 aa = ptlrpc_req_async_args(req);
639 ptlrpc_set_add_req(set, req);
643 /* Find and cancel locally locks matched by @mode in the resource found by
644 * @objid. Found locks are added into @cancel list. Returns the amount of
645 * locks added to @cancels list. */
646 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
648 ldlm_mode_t mode, int lock_flags)
650 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
651 struct ldlm_res_id res_id;
652 struct ldlm_resource *res;
656 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
657 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
661 LDLM_RESOURCE_ADDREF(res);
662 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
663 lock_flags, 0, NULL);
664 LDLM_RESOURCE_DELREF(res);
665 ldlm_resource_putref(res);
669 static int osc_destroy_interpret(const struct lu_env *env,
670 struct ptlrpc_request *req, void *data,
673 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
675 cfs_atomic_dec(&cli->cl_destroy_in_flight);
676 cfs_waitq_signal(&cli->cl_destroy_waitq);
680 static int osc_can_send_destroy(struct client_obd *cli)
682 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
683 cli->cl_max_rpcs_in_flight) {
684 /* The destroy request can be sent */
687 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
688 cli->cl_max_rpcs_in_flight) {
690 * The counter has been modified between the two atomic
693 cfs_waitq_signal(&cli->cl_destroy_waitq);
698 /* Destroy requests can be async always on the client, and we don't even really
699 * care about the return code since the client cannot do anything at all about
701 * When the MDS is unlinking a filename, it saves the file objects into a
702 * recovery llog, and these object records are cancelled when the OST reports
703 * they were destroyed and sync'd to disk (i.e. transaction committed).
704 * If the client dies, or the OST is down when the object should be destroyed,
705 * the records are not cancelled, and when the OST reconnects to the MDS next,
706 * it will retrieve the llog unlink logs and then sends the log cancellation
707 * cookies to the MDS after committing destroy transactions. */
708 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
709 struct lov_stripe_md *ea, struct obd_trans_info *oti,
710 struct obd_export *md_export, void *capa)
712 struct client_obd *cli = &exp->exp_obd->u.cli;
713 struct ptlrpc_request *req;
714 struct ost_body *body;
715 CFS_LIST_HEAD(cancels);
720 CDEBUG(D_INFO, "oa NULL\n");
724 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
725 LDLM_FL_DISCARD_DATA);
727 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
729 ldlm_lock_list_put(&cancels, l_bl_ast, count);
733 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
734 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
737 ptlrpc_request_free(req);
741 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
742 ptlrpc_at_set_req_timeout(req);
744 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
745 oa->o_lcookie = *oti->oti_logcookies;
746 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
748 lustre_set_wire_obdo(&body->oa, oa);
750 osc_pack_capa(req, body, (struct obd_capa *)capa);
751 ptlrpc_request_set_replen(req);
753 /* don't throttle destroy RPCs for the MDT */
754 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
755 req->rq_interpret_reply = osc_destroy_interpret;
756 if (!osc_can_send_destroy(cli)) {
757 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
761 * Wait until the number of on-going destroy RPCs drops
762 * under max_rpc_in_flight
764 l_wait_event_exclusive(cli->cl_destroy_waitq,
765 osc_can_send_destroy(cli), &lwi);
769 /* Do not wait for response */
770 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
774 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
777 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
779 LASSERT(!(oa->o_valid & bits));
782 client_obd_list_lock(&cli->cl_loi_list_lock);
783 oa->o_dirty = cli->cl_dirty;
784 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
785 CERROR("dirty %lu - %lu > dirty_max %lu\n",
786 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
788 } else if (cfs_atomic_read(&obd_dirty_pages) -
789 cfs_atomic_read(&obd_dirty_transit_pages) >
790 obd_max_dirty_pages + 1){
791 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
792 * not covered by a lock thus they may safely race and trip
793 * this CERROR() unless we add in a small fudge factor (+1). */
794 CERROR("dirty %d - %d > system dirty_max %d\n",
795 cfs_atomic_read(&obd_dirty_pages),
796 cfs_atomic_read(&obd_dirty_transit_pages),
797 obd_max_dirty_pages);
799 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
800 CERROR("dirty %lu - dirty_max %lu too big???\n",
801 cli->cl_dirty, cli->cl_dirty_max);
804 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
805 (cli->cl_max_rpcs_in_flight + 1);
806 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
808 oa->o_grant = cli->cl_avail_grant;
809 oa->o_dropped = cli->cl_lost_grant;
810 cli->cl_lost_grant = 0;
811 client_obd_list_unlock(&cli->cl_loi_list_lock);
812 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
813 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
817 static void osc_update_next_shrink(struct client_obd *cli)
819 cli->cl_next_shrink_grant =
820 cfs_time_shift(cli->cl_grant_shrink_interval);
821 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
822 cli->cl_next_shrink_grant);
825 /* caller must hold loi_list_lock */
826 static void osc_consume_write_grant(struct client_obd *cli,
827 struct brw_page *pga)
829 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
830 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
831 cfs_atomic_inc(&obd_dirty_pages);
832 cli->cl_dirty += CFS_PAGE_SIZE;
833 cli->cl_avail_grant -= CFS_PAGE_SIZE;
834 pga->flag |= OBD_BRW_FROM_GRANT;
835 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
836 CFS_PAGE_SIZE, pga, pga->pg);
837 LASSERT(cli->cl_avail_grant >= 0);
838 osc_update_next_shrink(cli);
841 /* the companion to osc_consume_write_grant, called when a brw has completed.
842 * must be called with the loi lock held. */
843 static void osc_release_write_grant(struct client_obd *cli,
844 struct brw_page *pga, int sent)
846 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
849 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
850 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
855 pga->flag &= ~OBD_BRW_FROM_GRANT;
856 cfs_atomic_dec(&obd_dirty_pages);
857 cli->cl_dirty -= CFS_PAGE_SIZE;
858 if (pga->flag & OBD_BRW_NOCACHE) {
859 pga->flag &= ~OBD_BRW_NOCACHE;
860 cfs_atomic_dec(&obd_dirty_transit_pages);
861 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
864 cli->cl_lost_grant += CFS_PAGE_SIZE;
865 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
866 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
867 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
868 /* For short writes we shouldn't count parts of pages that
869 * span a whole block on the OST side, or our accounting goes
870 * wrong. Should match the code in filter_grant_check. */
871 int offset = pga->off & ~CFS_PAGE_MASK;
872 int count = pga->count + (offset & (blocksize - 1));
873 int end = (offset + pga->count) & (blocksize - 1);
875 count += blocksize - end;
877 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
878 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
879 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
880 cli->cl_avail_grant, cli->cl_dirty);
886 static unsigned long rpcs_in_flight(struct client_obd *cli)
888 return cli->cl_r_in_flight + cli->cl_w_in_flight;
891 /* caller must hold loi_list_lock */
892 void osc_wake_cache_waiters(struct client_obd *cli)
895 struct osc_cache_waiter *ocw;
898 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
899 /* if we can't dirty more, we must wait until some is written */
900 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
901 (cfs_atomic_read(&obd_dirty_pages) + 1 >
902 obd_max_dirty_pages)) {
903 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
904 "osc max %ld, sys max %d\n", cli->cl_dirty,
905 cli->cl_dirty_max, obd_max_dirty_pages);
909 /* if still dirty cache but no grant wait for pending RPCs that
910 * may yet return us some grant before doing sync writes */
911 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
912 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
913 cli->cl_w_in_flight);
917 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
918 cfs_list_del_init(&ocw->ocw_entry);
919 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
920 /* no more RPCs in flight to return grant, do sync IO */
921 ocw->ocw_rc = -EDQUOT;
922 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
924 osc_consume_write_grant(cli,
925 &ocw->ocw_oap->oap_brw_page);
928 cfs_waitq_signal(&ocw->ocw_waitq);
934 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
936 client_obd_list_lock(&cli->cl_loi_list_lock);
937 cli->cl_avail_grant += grant;
938 client_obd_list_unlock(&cli->cl_loi_list_lock);
941 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
943 if (body->oa.o_valid & OBD_MD_FLGRANT) {
944 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
945 __osc_update_grant(cli, body->oa.o_grant);
949 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
950 void *key, obd_count vallen, void *val,
951 struct ptlrpc_request_set *set);
953 static int osc_shrink_grant_interpret(const struct lu_env *env,
954 struct ptlrpc_request *req,
957 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
958 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
959 struct ost_body *body;
962 __osc_update_grant(cli, oa->o_grant);
966 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
968 osc_update_grant(cli, body);
974 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
976 client_obd_list_lock(&cli->cl_loi_list_lock);
977 oa->o_grant = cli->cl_avail_grant / 4;
978 cli->cl_avail_grant -= oa->o_grant;
979 client_obd_list_unlock(&cli->cl_loi_list_lock);
980 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
981 oa->o_valid |= OBD_MD_FLFLAGS;
984 oa->o_flags |= OBD_FL_SHRINK_GRANT;
985 osc_update_next_shrink(cli);
988 /* Shrink the current grant, either from some large amount to enough for a
989 * full set of in-flight RPCs, or if we have already shrunk to that limit
990 * then to enough for a single RPC. This avoids keeping more grant than
991 * needed, and avoids shrinking the grant piecemeal. */
992 static int osc_shrink_grant(struct client_obd *cli)
994 long target = (cli->cl_max_rpcs_in_flight + 1) *
995 cli->cl_max_pages_per_rpc;
997 client_obd_list_lock(&cli->cl_loi_list_lock);
998 if (cli->cl_avail_grant <= target)
999 target = cli->cl_max_pages_per_rpc;
1000 client_obd_list_unlock(&cli->cl_loi_list_lock);
1002 return osc_shrink_grant_to_target(cli, target);
1005 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1008 struct ost_body *body;
1011 client_obd_list_lock(&cli->cl_loi_list_lock);
1012 /* Don't shrink if we are already above or below the desired limit
1013 * We don't want to shrink below a single RPC, as that will negatively
1014 * impact block allocation and long-term performance. */
1015 if (target < cli->cl_max_pages_per_rpc)
1016 target = cli->cl_max_pages_per_rpc;
1018 if (target >= cli->cl_avail_grant) {
1019 client_obd_list_unlock(&cli->cl_loi_list_lock);
1022 client_obd_list_unlock(&cli->cl_loi_list_lock);
1024 OBD_ALLOC_PTR(body);
1028 osc_announce_cached(cli, &body->oa, 0);
1030 client_obd_list_lock(&cli->cl_loi_list_lock);
1031 body->oa.o_grant = cli->cl_avail_grant - target;
1032 cli->cl_avail_grant = target;
1033 client_obd_list_unlock(&cli->cl_loi_list_lock);
1034 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1035 body->oa.o_valid |= OBD_MD_FLFLAGS;
1036 body->oa.o_flags = 0;
1038 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1039 osc_update_next_shrink(cli);
1041 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1042 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1043 sizeof(*body), body, NULL);
1045 __osc_update_grant(cli, body->oa.o_grant);
1050 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1051 static int osc_should_shrink_grant(struct client_obd *client)
1053 cfs_time_t time = cfs_time_current();
1054 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1056 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1057 OBD_CONNECT_GRANT_SHRINK) == 0)
1060 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1061 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1062 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1065 osc_update_next_shrink(client);
1070 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1072 struct client_obd *client;
1074 cfs_list_for_each_entry(client, &item->ti_obd_list,
1075 cl_grant_shrink_list) {
1076 if (osc_should_shrink_grant(client))
1077 osc_shrink_grant(client);
1082 static int osc_add_shrink_grant(struct client_obd *client)
1086 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1088 osc_grant_shrink_grant_cb, NULL,
1089 &client->cl_grant_shrink_list);
1091 CERROR("add grant client %s error %d\n",
1092 client->cl_import->imp_obd->obd_name, rc);
1095 CDEBUG(D_CACHE, "add grant client %s \n",
1096 client->cl_import->imp_obd->obd_name);
1097 osc_update_next_shrink(client);
1101 static int osc_del_shrink_grant(struct client_obd *client)
1103 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1107 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1110 * ocd_grant is the total grant amount we're expect to hold: if we've
1111 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1112 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1114 * race is tolerable here: if we're evicted, but imp_state already
1115 * left EVICTED state, then cl_dirty must be 0 already.
1117 client_obd_list_lock(&cli->cl_loi_list_lock);
1118 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1119 cli->cl_avail_grant = ocd->ocd_grant;
1121 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1123 if (cli->cl_avail_grant < 0) {
1124 CWARN("%s: available grant < 0, the OSS is probably not running"
1125 " with patch from bug20278 (%ld) \n",
1126 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1127 /* workaround for 1.6 servers which do not have
1128 * the patch from bug20278 */
1129 cli->cl_avail_grant = ocd->ocd_grant;
1132 client_obd_list_unlock(&cli->cl_loi_list_lock);
1134 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1135 cli->cl_import->imp_obd->obd_name,
1136 cli->cl_avail_grant, cli->cl_lost_grant);
1138 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1139 cfs_list_empty(&cli->cl_grant_shrink_list))
1140 osc_add_shrink_grant(cli);
1143 /* We assume that the reason this OSC got a short read is because it read
1144 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1145 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1146 * this stripe never got written at or beyond this stripe offset yet. */
1147 static void handle_short_read(int nob_read, obd_count page_count,
1148 struct brw_page **pga)
1153 /* skip bytes read OK */
1154 while (nob_read > 0) {
1155 LASSERT (page_count > 0);
1157 if (pga[i]->count > nob_read) {
1158 /* EOF inside this page */
1159 ptr = cfs_kmap(pga[i]->pg) +
1160 (pga[i]->off & ~CFS_PAGE_MASK);
1161 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1162 cfs_kunmap(pga[i]->pg);
1168 nob_read -= pga[i]->count;
1173 /* zero remaining pages */
1174 while (page_count-- > 0) {
1175 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1176 memset(ptr, 0, pga[i]->count);
1177 cfs_kunmap(pga[i]->pg);
1182 static int check_write_rcs(struct ptlrpc_request *req,
1183 int requested_nob, int niocount,
1184 obd_count page_count, struct brw_page **pga)
1189 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1190 sizeof(*remote_rcs) *
1192 if (remote_rcs == NULL) {
1193 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1197 /* return error if any niobuf was in error */
1198 for (i = 0; i < niocount; i++) {
1199 if ((int)remote_rcs[i] < 0)
1200 return(remote_rcs[i]);
1202 if (remote_rcs[i] != 0) {
1203 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1204 i, remote_rcs[i], req);
1209 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1210 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1211 req->rq_bulk->bd_nob_transferred, requested_nob);
1218 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1220 if (p1->flag != p2->flag) {
1221 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1222 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1224 /* warn if we try to combine flags that we don't know to be
1225 * safe to combine */
1226 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1227 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1228 "report this at http://bugs.whamcloud.com/\n",
1229 p1->flag, p2->flag);
1234 return (p1->off + p1->count == p2->off);
1237 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1238 struct brw_page **pga, int opc,
1239 cksum_type_t cksum_type)
1244 LASSERT (pg_count > 0);
1245 cksum = init_checksum(cksum_type);
1246 while (nob > 0 && pg_count > 0) {
1247 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1248 int off = pga[i]->off & ~CFS_PAGE_MASK;
1249 int count = pga[i]->count > nob ? nob : pga[i]->count;
1251 /* corrupt the data before we compute the checksum, to
1252 * simulate an OST->client data error */
1253 if (i == 0 && opc == OST_READ &&
1254 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1255 memcpy(ptr + off, "bad1", min(4, nob));
1256 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1257 cfs_kunmap(pga[i]->pg);
1258 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1261 nob -= pga[i]->count;
1265 /* For sending we only compute the wrong checksum instead
1266 * of corrupting the data so it is still correct on a redo */
1267 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1273 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1274 struct lov_stripe_md *lsm, obd_count page_count,
1275 struct brw_page **pga,
1276 struct ptlrpc_request **reqp,
1277 struct obd_capa *ocapa, int reserve,
1280 struct ptlrpc_request *req;
1281 struct ptlrpc_bulk_desc *desc;
1282 struct ost_body *body;
1283 struct obd_ioobj *ioobj;
1284 struct niobuf_remote *niobuf;
1285 int niocount, i, requested_nob, opc, rc;
1286 struct osc_brw_async_args *aa;
1287 struct req_capsule *pill;
1288 struct brw_page *pg_prev;
1291 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1292 RETURN(-ENOMEM); /* Recoverable */
1293 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1294 RETURN(-EINVAL); /* Fatal */
1296 if ((cmd & OBD_BRW_WRITE) != 0) {
1298 req = ptlrpc_request_alloc_pool(cli->cl_import,
1299 cli->cl_import->imp_rq_pool,
1300 &RQF_OST_BRW_WRITE);
1303 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1308 for (niocount = i = 1; i < page_count; i++) {
1309 if (!can_merge_pages(pga[i - 1], pga[i]))
1313 pill = &req->rq_pill;
1314 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1316 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1317 niocount * sizeof(*niobuf));
1318 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1322 ptlrpc_request_free(req);
1325 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1326 ptlrpc_at_set_req_timeout(req);
1328 if (opc == OST_WRITE)
1329 desc = ptlrpc_prep_bulk_imp(req, page_count,
1330 BULK_GET_SOURCE, OST_BULK_PORTAL);
1332 desc = ptlrpc_prep_bulk_imp(req, page_count,
1333 BULK_PUT_SINK, OST_BULK_PORTAL);
1336 GOTO(out, rc = -ENOMEM);
1337 /* NB request now owns desc and will free it when it gets freed */
1339 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1340 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1341 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1342 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1344 lustre_set_wire_obdo(&body->oa, oa);
1346 obdo_to_ioobj(oa, ioobj);
1347 ioobj->ioo_bufcnt = niocount;
1348 osc_pack_capa(req, body, ocapa);
1349 LASSERT (page_count > 0);
1351 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1352 struct brw_page *pg = pga[i];
1353 int poff = pg->off & ~CFS_PAGE_MASK;
1355 LASSERT(pg->count > 0);
1356 /* make sure there is no gap in the middle of page array */
1357 LASSERTF(page_count == 1 ||
1358 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1359 ergo(i > 0 && i < page_count - 1,
1360 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1361 ergo(i == page_count - 1, poff == 0)),
1362 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1363 i, page_count, pg, pg->off, pg->count);
1365 LASSERTF(i == 0 || pg->off > pg_prev->off,
1366 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1367 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1369 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1370 pg_prev->pg, page_private(pg_prev->pg),
1371 pg_prev->pg->index, pg_prev->off);
1373 LASSERTF(i == 0 || pg->off > pg_prev->off,
1374 "i %d p_c %u\n", i, page_count);
1376 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1377 (pg->flag & OBD_BRW_SRVLOCK));
1379 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1380 requested_nob += pg->count;
1382 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1384 niobuf->len += pg->count;
1386 niobuf->offset = pg->off;
1387 niobuf->len = pg->count;
1388 niobuf->flags = pg->flag;
1393 LASSERTF((void *)(niobuf - niocount) ==
1394 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1395 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1396 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1398 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1400 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401 body->oa.o_valid |= OBD_MD_FLFLAGS;
1402 body->oa.o_flags = 0;
1404 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1407 if (osc_should_shrink_grant(cli))
1408 osc_shrink_grant_local(cli, &body->oa);
1410 /* size[REQ_REC_OFF] still sizeof (*body) */
1411 if (opc == OST_WRITE) {
1412 if (unlikely(cli->cl_checksum) &&
1413 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1414 /* store cl_cksum_type in a local variable since
1415 * it can be changed via lprocfs */
1416 cksum_type_t cksum_type = cli->cl_cksum_type;
1418 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1419 oa->o_flags &= OBD_FL_LOCAL_MASK;
1420 body->oa.o_flags = 0;
1422 body->oa.o_flags |= cksum_type_pack(cksum_type);
1423 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1428 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1430 /* save this in 'oa', too, for later checking */
1431 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1432 oa->o_flags |= cksum_type_pack(cksum_type);
1434 /* clear out the checksum flag, in case this is a
1435 * resend but cl_checksum is no longer set. b=11238 */
1436 oa->o_valid &= ~OBD_MD_FLCKSUM;
1438 oa->o_cksum = body->oa.o_cksum;
1439 /* 1 RC per niobuf */
1440 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1441 sizeof(__u32) * niocount);
1443 if (unlikely(cli->cl_checksum) &&
1444 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1445 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1446 body->oa.o_flags = 0;
1447 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1448 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1451 ptlrpc_request_set_replen(req);
1453 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1454 aa = ptlrpc_req_async_args(req);
1456 aa->aa_requested_nob = requested_nob;
1457 aa->aa_nio_count = niocount;
1458 aa->aa_page_count = page_count;
1462 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1463 if (ocapa && reserve)
1464 aa->aa_ocapa = capa_get(ocapa);
1470 ptlrpc_req_finished(req);
1474 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1475 __u32 client_cksum, __u32 server_cksum, int nob,
1476 obd_count page_count, struct brw_page **pga,
1477 cksum_type_t client_cksum_type)
1481 cksum_type_t cksum_type;
1483 if (server_cksum == client_cksum) {
1484 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1488 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1490 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1493 if (cksum_type != client_cksum_type)
1494 msg = "the server did not use the checksum type specified in "
1495 "the original request - likely a protocol problem";
1496 else if (new_cksum == server_cksum)
1497 msg = "changed on the client after we checksummed it - "
1498 "likely false positive due to mmap IO (bug 11742)";
1499 else if (new_cksum == client_cksum)
1500 msg = "changed in transit before arrival at OST";
1502 msg = "changed in transit AND doesn't match the original - "
1503 "likely false positive due to mmap IO (bug 11742)";
1505 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1506 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1507 msg, libcfs_nid2str(peer->nid),
1508 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1509 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1510 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1512 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1514 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1515 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1516 "client csum now %x\n", client_cksum, client_cksum_type,
1517 server_cksum, cksum_type, new_cksum);
1521 /* Note rc enters this function as number of bytes transferred */
1522 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1524 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1525 const lnet_process_id_t *peer =
1526 &req->rq_import->imp_connection->c_peer;
1527 struct client_obd *cli = aa->aa_cli;
1528 struct ost_body *body;
1529 __u32 client_cksum = 0;
1532 if (rc < 0 && rc != -EDQUOT) {
1533 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1537 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1538 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1540 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1544 /* set/clear over quota flag for a uid/gid */
1545 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1546 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1547 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1549 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1550 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1552 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1555 osc_update_grant(cli, body);
1560 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1561 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1563 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1565 CERROR("Unexpected +ve rc %d\n", rc);
1568 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1570 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1573 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1574 check_write_checksum(&body->oa, peer, client_cksum,
1575 body->oa.o_cksum, aa->aa_requested_nob,
1576 aa->aa_page_count, aa->aa_ppga,
1577 cksum_type_unpack(aa->aa_oa->o_flags)))
1580 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1581 aa->aa_page_count, aa->aa_ppga);
1585 /* The rest of this function executes only for OST_READs */
1587 /* if unwrap_bulk failed, return -EAGAIN to retry */
1588 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1590 GOTO(out, rc = -EAGAIN);
1592 if (rc > aa->aa_requested_nob) {
1593 CERROR("Unexpected rc %d (%d requested)\n", rc,
1594 aa->aa_requested_nob);
1598 if (rc != req->rq_bulk->bd_nob_transferred) {
1599 CERROR ("Unexpected rc %d (%d transferred)\n",
1600 rc, req->rq_bulk->bd_nob_transferred);
1604 if (rc < aa->aa_requested_nob)
1605 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1607 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1608 static int cksum_counter;
1609 __u32 server_cksum = body->oa.o_cksum;
1612 cksum_type_t cksum_type;
1614 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1615 body->oa.o_flags : 0);
1616 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1617 aa->aa_ppga, OST_READ,
1620 if (peer->nid == req->rq_bulk->bd_sender) {
1624 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1627 if (server_cksum == ~0 && rc > 0) {
1628 CERROR("Protocol error: server %s set the 'checksum' "
1629 "bit, but didn't send a checksum. Not fatal, "
1630 "but please notify on http://bugs.whamcloud.com/\n",
1631 libcfs_nid2str(peer->nid));
1632 } else if (server_cksum != client_cksum) {
1633 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1634 "%s%s%s inode "DFID" object "
1635 LPU64"/"LPU64" extent "
1636 "["LPU64"-"LPU64"]\n",
1637 req->rq_import->imp_obd->obd_name,
1638 libcfs_nid2str(peer->nid),
1640 body->oa.o_valid & OBD_MD_FLFID ?
1641 body->oa.o_parent_seq : (__u64)0,
1642 body->oa.o_valid & OBD_MD_FLFID ?
1643 body->oa.o_parent_oid : 0,
1644 body->oa.o_valid & OBD_MD_FLFID ?
1645 body->oa.o_parent_ver : 0,
1647 body->oa.o_valid & OBD_MD_FLGROUP ?
1648 body->oa.o_seq : (__u64)0,
1649 aa->aa_ppga[0]->off,
1650 aa->aa_ppga[aa->aa_page_count-1]->off +
1651 aa->aa_ppga[aa->aa_page_count-1]->count -
1653 CERROR("client %x, server %x, cksum_type %x\n",
1654 client_cksum, server_cksum, cksum_type);
1656 aa->aa_oa->o_cksum = client_cksum;
1660 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1663 } else if (unlikely(client_cksum)) {
1664 static int cksum_missed;
1667 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1668 CERROR("Checksum %u requested from %s but not sent\n",
1669 cksum_missed, libcfs_nid2str(peer->nid));
1675 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1680 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1681 struct lov_stripe_md *lsm,
1682 obd_count page_count, struct brw_page **pga,
1683 struct obd_capa *ocapa)
1685 struct ptlrpc_request *req;
1689 struct l_wait_info lwi;
1693 cfs_waitq_init(&waitq);
1696 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1697 page_count, pga, &req, ocapa, 0, resends);
1701 rc = ptlrpc_queue_wait(req);
1703 if (rc == -ETIMEDOUT && req->rq_resend) {
1704 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1705 ptlrpc_req_finished(req);
1709 rc = osc_brw_fini_request(req, rc);
1711 ptlrpc_req_finished(req);
1712 if (osc_recoverable_error(rc)) {
1714 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1715 CERROR("too many resend retries, returning error\n");
1719 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1720 l_wait_event(waitq, 0, &lwi);
1728 int osc_brw_redo_request(struct ptlrpc_request *request,
1729 struct osc_brw_async_args *aa)
1731 struct ptlrpc_request *new_req;
1732 struct ptlrpc_request_set *set = request->rq_set;
1733 struct osc_brw_async_args *new_aa;
1734 struct osc_async_page *oap;
1738 if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1739 CERROR("too many resent retries, returning error\n");
1743 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1745 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1746 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1747 aa->aa_cli, aa->aa_oa,
1748 NULL /* lsm unused by osc currently */,
1749 aa->aa_page_count, aa->aa_ppga,
1750 &new_req, aa->aa_ocapa, 0, 1);
1754 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1756 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1757 if (oap->oap_request != NULL) {
1758 LASSERTF(request == oap->oap_request,
1759 "request %p != oap_request %p\n",
1760 request, oap->oap_request);
1761 if (oap->oap_interrupted) {
1762 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1763 ptlrpc_req_finished(new_req);
1768 /* New request takes over pga and oaps from old request.
1769 * Note that copying a list_head doesn't work, need to move it... */
1771 new_req->rq_interpret_reply = request->rq_interpret_reply;
1772 new_req->rq_async_args = request->rq_async_args;
1773 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1775 new_aa = ptlrpc_req_async_args(new_req);
1777 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1778 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1779 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1781 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1782 if (oap->oap_request) {
1783 ptlrpc_req_finished(oap->oap_request);
1784 oap->oap_request = ptlrpc_request_addref(new_req);
1788 new_aa->aa_ocapa = aa->aa_ocapa;
1789 aa->aa_ocapa = NULL;
1791 /* use ptlrpc_set_add_req is safe because interpret functions work
1792 * in check_set context. only one way exist with access to request
1793 * from different thread got -EINTR - this way protected with
1794 * cl_loi_list_lock */
1795 ptlrpc_set_add_req(set, new_req);
1797 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1799 DEBUG_REQ(D_INFO, new_req, "new request");
1804 * ugh, we want disk allocation on the target to happen in offset order. we'll
1805 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1806 * fine for our small page arrays and doesn't require allocation. its an
1807 * insertion sort that swaps elements that are strides apart, shrinking the
1808 * stride down until its '1' and the array is sorted.
1810 static void sort_brw_pages(struct brw_page **array, int num)
1813 struct brw_page *tmp;
1817 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1822 for (i = stride ; i < num ; i++) {
1825 while (j >= stride && array[j - stride]->off > tmp->off) {
1826 array[j] = array[j - stride];
1831 } while (stride > 1);
1834 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1840 LASSERT (pages > 0);
1841 offset = pg[i]->off & ~CFS_PAGE_MASK;
1845 if (pages == 0) /* that's all */
1848 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1849 return count; /* doesn't end on page boundary */
1852 offset = pg[i]->off & ~CFS_PAGE_MASK;
1853 if (offset != 0) /* doesn't start on page boundary */
1860 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1862 struct brw_page **ppga;
1865 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1869 for (i = 0; i < count; i++)
1874 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1876 LASSERT(ppga != NULL);
1877 OBD_FREE(ppga, sizeof(*ppga) * count);
1880 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1881 obd_count page_count, struct brw_page *pga,
1882 struct obd_trans_info *oti)
1884 struct obdo *saved_oa = NULL;
1885 struct brw_page **ppga, **orig;
1886 struct obd_import *imp = class_exp2cliimp(exp);
1887 struct client_obd *cli;
1888 int rc, page_count_orig;
1891 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1892 cli = &imp->imp_obd->u.cli;
1894 if (cmd & OBD_BRW_CHECK) {
1895 /* The caller just wants to know if there's a chance that this
1896 * I/O can succeed */
1898 if (imp->imp_invalid)
1903 /* test_brw with a failed create can trip this, maybe others. */
1904 LASSERT(cli->cl_max_pages_per_rpc);
1908 orig = ppga = osc_build_ppga(pga, page_count);
1911 page_count_orig = page_count;
1913 sort_brw_pages(ppga, page_count);
1914 while (page_count) {
1915 obd_count pages_per_brw;
1917 if (page_count > cli->cl_max_pages_per_rpc)
1918 pages_per_brw = cli->cl_max_pages_per_rpc;
1920 pages_per_brw = page_count;
1922 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1924 if (saved_oa != NULL) {
1925 /* restore previously saved oa */
1926 *oinfo->oi_oa = *saved_oa;
1927 } else if (page_count > pages_per_brw) {
1928 /* save a copy of oa (brw will clobber it) */
1929 OBDO_ALLOC(saved_oa);
1930 if (saved_oa == NULL)
1931 GOTO(out, rc = -ENOMEM);
1932 *saved_oa = *oinfo->oi_oa;
1935 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1936 pages_per_brw, ppga, oinfo->oi_capa);
1941 page_count -= pages_per_brw;
1942 ppga += pages_per_brw;
1946 osc_release_ppga(orig, page_count_orig);
1948 if (saved_oa != NULL)
1949 OBDO_FREE(saved_oa);
1954 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1955 * the dirty accounting. Writeback completes or truncate happens before
1956 * writing starts. Must be called with the loi lock held. */
1957 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1960 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1964 /* This maintains the lists of pending pages to read/write for a given object
1965 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1966 * to quickly find objects that are ready to send an RPC. */
1967 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1972 if (lop->lop_num_pending == 0)
1975 /* if we have an invalid import we want to drain the queued pages
1976 * by forcing them through rpcs that immediately fail and complete
1977 * the pages. recovery relies on this to empty the queued pages
1978 * before canceling the locks and evicting down the llite pages */
1979 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1982 /* stream rpcs in queue order as long as as there is an urgent page
1983 * queued. this is our cheap solution for good batching in the case
1984 * where writepage marks some random page in the middle of the file
1985 * as urgent because of, say, memory pressure */
1986 if (!cfs_list_empty(&lop->lop_urgent)) {
1987 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1991 if (cmd & OBD_BRW_WRITE) {
1992 /* trigger a write rpc stream as long as there are dirtiers
1993 * waiting for space. as they're waiting, they're not going to
1994 * create more pages to coalesce with what's waiting.. */
1995 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2000 if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2006 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2008 struct osc_async_page *oap;
2011 if (cfs_list_empty(&lop->lop_urgent))
2014 oap = cfs_list_entry(lop->lop_urgent.next,
2015 struct osc_async_page, oap_urgent_item);
2017 if (oap->oap_async_flags & ASYNC_HP) {
2018 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2025 static void on_list(cfs_list_t *item, cfs_list_t *list,
2028 if (cfs_list_empty(item) && should_be_on)
2029 cfs_list_add_tail(item, list);
2030 else if (!cfs_list_empty(item) && !should_be_on)
2031 cfs_list_del_init(item);
2034 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2035 * can find pages to build into rpcs quickly */
2036 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2038 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2039 lop_makes_hprpc(&loi->loi_read_lop)) {
2041 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2042 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2044 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2045 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2046 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2047 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2050 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2051 loi->loi_write_lop.lop_num_pending);
2053 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2054 loi->loi_read_lop.lop_num_pending);
2057 static void lop_update_pending(struct client_obd *cli,
2058 struct loi_oap_pages *lop, int cmd, int delta)
2060 lop->lop_num_pending += delta;
2061 if (cmd & OBD_BRW_WRITE)
2062 cli->cl_pending_w_pages += delta;
2064 cli->cl_pending_r_pages += delta;
2068 * this is called when a sync waiter receives an interruption. Its job is to
2069 * get the caller woken as soon as possible. If its page hasn't been put in an
2070 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2071 * desiring interruption which will forcefully complete the rpc once the rpc
2074 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2076 struct loi_oap_pages *lop;
2077 struct lov_oinfo *loi;
2081 LASSERT(!oap->oap_interrupted);
2082 oap->oap_interrupted = 1;
2084 /* ok, it's been put in an rpc. only one oap gets a request reference */
2085 if (oap->oap_request != NULL) {
2086 ptlrpc_mark_interrupted(oap->oap_request);
2087 ptlrpcd_wake(oap->oap_request);
2088 ptlrpc_req_finished(oap->oap_request);
2089 oap->oap_request = NULL;
2093 * page completion may be called only if ->cpo_prep() method was
2094 * executed by osc_io_submit(), that also adds page the to pending list
2096 if (!cfs_list_empty(&oap->oap_pending_item)) {
2097 cfs_list_del_init(&oap->oap_pending_item);
2098 cfs_list_del_init(&oap->oap_urgent_item);
2101 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2102 &loi->loi_write_lop : &loi->loi_read_lop;
2103 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2104 loi_list_maint(oap->oap_cli, oap->oap_loi);
2105 rc = oap->oap_caller_ops->ap_completion(env,
2106 oap->oap_caller_data,
2107 oap->oap_cmd, NULL, -EINTR);
2113 /* this is trying to propogate async writeback errors back up to the
2114 * application. As an async write fails we record the error code for later if
2115 * the app does an fsync. As long as errors persist we force future rpcs to be
2116 * sync so that the app can get a sync error and break the cycle of queueing
2117 * pages for which writeback will fail. */
2118 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2125 ar->ar_force_sync = 1;
2126 ar->ar_min_xid = ptlrpc_sample_next_xid();
2131 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2132 ar->ar_force_sync = 0;
2135 void osc_oap_to_pending(struct osc_async_page *oap)
2137 struct loi_oap_pages *lop;
2139 if (oap->oap_cmd & OBD_BRW_WRITE)
2140 lop = &oap->oap_loi->loi_write_lop;
2142 lop = &oap->oap_loi->loi_read_lop;
2144 if (oap->oap_async_flags & ASYNC_HP)
2145 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2146 else if (oap->oap_async_flags & ASYNC_URGENT)
2147 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2148 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2149 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2152 /* this must be called holding the loi list lock to give coverage to exit_cache,
2153 * async_flag maintenance, and oap_request */
2154 static void osc_ap_completion(const struct lu_env *env,
2155 struct client_obd *cli, struct obdo *oa,
2156 struct osc_async_page *oap, int sent, int rc)
2161 if (oap->oap_request != NULL) {
2162 xid = ptlrpc_req_xid(oap->oap_request);
2163 ptlrpc_req_finished(oap->oap_request);
2164 oap->oap_request = NULL;
2167 cfs_spin_lock(&oap->oap_lock);
2168 oap->oap_async_flags = 0;
2169 cfs_spin_unlock(&oap->oap_lock);
2170 oap->oap_interrupted = 0;
2172 if (oap->oap_cmd & OBD_BRW_WRITE) {
2173 osc_process_ar(&cli->cl_ar, xid, rc);
2174 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2177 if (rc == 0 && oa != NULL) {
2178 if (oa->o_valid & OBD_MD_FLBLOCKS)
2179 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2180 if (oa->o_valid & OBD_MD_FLMTIME)
2181 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2182 if (oa->o_valid & OBD_MD_FLATIME)
2183 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2184 if (oa->o_valid & OBD_MD_FLCTIME)
2185 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2188 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2189 oap->oap_cmd, oa, rc);
2191 /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2192 * start, but OSC calls it under lock and thus we can add oap back to
2195 /* upper layer wants to leave the page on pending queue */
2196 osc_oap_to_pending(oap);
2198 osc_exit_cache(cli, oap, sent);
2202 static int brw_queue_work(const struct lu_env *env, void *data)
2204 struct client_obd *cli = data;
2206 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2208 client_obd_list_lock(&cli->cl_loi_list_lock);
2209 osc_check_rpcs0(env, cli, 1);
2210 client_obd_list_unlock(&cli->cl_loi_list_lock);
2214 static int brw_interpret(const struct lu_env *env,
2215 struct ptlrpc_request *req, void *data, int rc)
2217 struct osc_brw_async_args *aa = data;
2218 struct client_obd *cli;
2222 rc = osc_brw_fini_request(req, rc);
2223 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2224 if (osc_recoverable_error(rc)) {
2225 rc = osc_brw_redo_request(req, aa);
2231 capa_put(aa->aa_ocapa);
2232 aa->aa_ocapa = NULL;
2236 client_obd_list_lock(&cli->cl_loi_list_lock);
2238 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2239 * is called so we know whether to go to sync BRWs or wait for more
2240 * RPCs to complete */
2241 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2242 cli->cl_w_in_flight--;
2244 cli->cl_r_in_flight--;
2246 async = cfs_list_empty(&aa->aa_oaps);
2247 if (!async) { /* from osc_send_oap_rpc() */
2248 struct osc_async_page *oap, *tmp;
2249 /* the caller may re-use the oap after the completion call so
2250 * we need to clean it up a little */
2251 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2253 cfs_list_del_init(&oap->oap_rpc_item);
2254 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2256 OBDO_FREE(aa->aa_oa);
2257 } else { /* from async_internal() */
2259 for (i = 0; i < aa->aa_page_count; i++)
2260 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2262 osc_wake_cache_waiters(cli);
2263 osc_check_rpcs0(env, cli, 1);
2264 client_obd_list_unlock(&cli->cl_loi_list_lock);
2267 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2268 req->rq_bulk->bd_nob_transferred);
2269 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2270 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2275 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2276 struct client_obd *cli,
2277 cfs_list_t *rpc_list,
2278 int page_count, int cmd)
2280 struct ptlrpc_request *req;
2281 struct brw_page **pga = NULL;
2282 struct osc_brw_async_args *aa;
2283 struct obdo *oa = NULL;
2284 const struct obd_async_page_ops *ops = NULL;
2285 struct osc_async_page *oap;
2286 struct osc_async_page *tmp;
2287 struct cl_req *clerq = NULL;
2288 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2289 struct ldlm_lock *lock = NULL;
2290 struct cl_req_attr crattr;
2291 int i, rc, mpflag = 0;
2294 LASSERT(!cfs_list_empty(rpc_list));
2296 if (cmd & OBD_BRW_MEMALLOC)
2297 mpflag = cfs_memory_pressure_get_and_set();
2299 memset(&crattr, 0, sizeof crattr);
2300 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2302 GOTO(out, req = ERR_PTR(-ENOMEM));
2306 GOTO(out, req = ERR_PTR(-ENOMEM));
2309 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2310 struct cl_page *page = osc_oap2cl_page(oap);
2312 ops = oap->oap_caller_ops;
2314 clerq = cl_req_alloc(env, page, crt,
2315 1 /* only 1-object rpcs for
2318 GOTO(out, req = (void *)clerq);
2319 lock = oap->oap_ldlm_lock;
2321 pga[i] = &oap->oap_brw_page;
2322 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2323 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2324 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2326 cl_req_page_add(env, clerq, page);
2329 /* always get the data for the obdo for the rpc */
2330 LASSERT(ops != NULL);
2332 crattr.cra_capa = NULL;
2333 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2335 oa->o_handle = lock->l_remote_handle;
2336 oa->o_valid |= OBD_MD_FLHANDLE;
2339 rc = cl_req_prep(env, clerq);
2341 CERROR("cl_req_prep failed: %d\n", rc);
2342 GOTO(out, req = ERR_PTR(rc));
2345 sort_brw_pages(pga, page_count);
2346 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2347 pga, &req, crattr.cra_capa, 1, 0);
2349 CERROR("prep_req failed: %d\n", rc);
2350 GOTO(out, req = ERR_PTR(rc));
2353 if (cmd & OBD_BRW_MEMALLOC)
2354 req->rq_memalloc = 1;
2356 /* Need to update the timestamps after the request is built in case
2357 * we race with setattr (locally or in queue at OST). If OST gets
2358 * later setattr before earlier BRW (as determined by the request xid),
2359 * the OST will not use BRW timestamps. Sadly, there is no obvious
2360 * way to do this in a single call. bug 10150 */
2361 cl_req_attr_set(env, clerq, &crattr,
2362 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2364 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2365 aa = ptlrpc_req_async_args(req);
2366 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2367 cfs_list_splice(rpc_list, &aa->aa_oaps);
2368 CFS_INIT_LIST_HEAD(rpc_list);
2369 aa->aa_clerq = clerq;
2371 if (cmd & OBD_BRW_MEMALLOC)
2372 cfs_memory_pressure_restore(mpflag);
2374 capa_put(crattr.cra_capa);
2379 OBD_FREE(pga, sizeof(*pga) * page_count);
2380 /* this should happen rarely and is pretty bad, it makes the
2381 * pending list not follow the dirty order */
2382 client_obd_list_lock(&cli->cl_loi_list_lock);
2383 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2384 cfs_list_del_init(&oap->oap_rpc_item);
2386 /* queued sync pages can be torn down while the pages
2387 * were between the pending list and the rpc */
2388 if (oap->oap_interrupted) {
2389 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2390 osc_ap_completion(env, cli, NULL, oap, 0,
2394 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2396 if (clerq && !IS_ERR(clerq))
2397 cl_req_completion(env, clerq, PTR_ERR(req));
2403 * prepare pages for ASYNC io and put pages in send queue.
2405 * \param cmd OBD_BRW_* macroses
2406 * \param lop pending pages
2408 * \return zero if no page added to send queue.
2409 * \return 1 if pages successfully added to send queue.
2410 * \return negative on errors.
2413 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2414 struct lov_oinfo *loi, int cmd,
2415 struct loi_oap_pages *lop, pdl_policy_t pol)
2417 struct ptlrpc_request *req;
2418 obd_count page_count = 0;
2419 struct osc_async_page *oap = NULL, *tmp;
2420 struct osc_brw_async_args *aa;
2421 const struct obd_async_page_ops *ops;
2422 CFS_LIST_HEAD(rpc_list);
2423 int srvlock = 0, mem_tight = 0;
2424 struct cl_object *clob = NULL;
2425 obd_off starting_offset = OBD_OBJECT_EOF;
2426 unsigned int ending_offset;
2427 int starting_page_off = 0;
2430 /* ASYNC_HP pages first. At present, when the lock the pages is
2431 * to be canceled, the pages covered by the lock will be sent out
2432 * with ASYNC_HP. We have to send out them as soon as possible. */
2433 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2434 if (oap->oap_async_flags & ASYNC_HP)
2435 cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
2436 if (++page_count >= cli->cl_max_pages_per_rpc)
2441 /* first we find the pages we're allowed to work with */
2442 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2444 ops = oap->oap_caller_ops;
2446 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2447 "magic 0x%x\n", oap, oap->oap_magic);
2450 /* pin object in memory, so that completion call-backs
2451 * can be safely called under client_obd_list lock. */
2452 clob = osc_oap2cl_page(oap)->cp_obj;
2453 cl_object_get(clob);
2456 if (page_count != 0 &&
2457 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2458 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2459 " oap %p, page %p, srvlock %u\n",
2460 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2464 /* If there is a gap at the start of this page, it can't merge
2465 * with any previous page, so we'll hand the network a
2466 * "fragmented" page array that it can't transfer in 1 RDMA */
2467 if (oap->oap_obj_off < starting_offset) {
2468 if (starting_page_off != 0)
2471 starting_page_off = oap->oap_page_off;
2472 starting_offset = oap->oap_obj_off + starting_page_off;
2473 } else if (oap->oap_page_off != 0)
2476 /* in llite being 'ready' equates to the page being locked
2477 * until completion unlocks it. commit_write submits a page
2478 * as not ready because its unlock will happen unconditionally
2479 * as the call returns. if we race with commit_write giving
2480 * us that page we don't want to create a hole in the page
2481 * stream, so we stop and leave the rpc to be fired by
2482 * another dirtier or kupdated interval (the not ready page
2483 * will still be on the dirty list). we could call in
2484 * at the end of ll_file_write to process the queue again. */
2485 if (!(oap->oap_async_flags & ASYNC_READY)) {
2486 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2489 CDEBUG(D_INODE, "oap %p page %p returned %d "
2490 "instead of ready\n", oap,
2494 /* llite is telling us that the page is still
2495 * in commit_write and that we should try
2496 * and put it in an rpc again later. we
2497 * break out of the loop so we don't create
2498 * a hole in the sequence of pages in the rpc
2503 /* the io isn't needed.. tell the checks
2504 * below to complete the rpc with EINTR */
2505 cfs_spin_lock(&oap->oap_lock);
2506 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2507 cfs_spin_unlock(&oap->oap_lock);
2508 oap->oap_count = -EINTR;
2511 cfs_spin_lock(&oap->oap_lock);
2512 oap->oap_async_flags |= ASYNC_READY;
2513 cfs_spin_unlock(&oap->oap_lock);
2516 LASSERTF(0, "oap %p page %p returned %d "
2517 "from make_ready\n", oap,
2525 /* take the page out of our book-keeping */
2526 cfs_list_del_init(&oap->oap_pending_item);
2527 lop_update_pending(cli, lop, cmd, -1);
2528 cfs_list_del_init(&oap->oap_urgent_item);
2530 /* ask the caller for the size of the io as the rpc leaves. */
2531 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2533 ops->ap_refresh_count(env, oap->oap_caller_data,
2535 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2537 if (oap->oap_count <= 0) {
2538 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2540 osc_ap_completion(env, cli, NULL,
2541 oap, 0, oap->oap_count);
2545 /* now put the page back in our accounting */
2546 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2547 if (page_count++ == 0)
2548 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2550 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2553 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2554 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2555 * have the same alignment as the initial writes that allocated
2556 * extents on the server. */
2557 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2559 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2562 if (page_count >= cli->cl_max_pages_per_rpc)
2565 /* If there is a gap at the end of this page, it can't merge
2566 * with any subsequent pages, so we'll hand the network a
2567 * "fragmented" page array that it can't transfer in 1 RDMA */
2568 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2572 osc_wake_cache_waiters(cli);
2574 loi_list_maint(cli, loi);
2576 client_obd_list_unlock(&cli->cl_loi_list_lock);
2579 cl_object_put(env, clob);
2581 if (page_count == 0) {
2582 client_obd_list_lock(&cli->cl_loi_list_lock);
2586 req = osc_build_req(env, cli, &rpc_list, page_count,
2587 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2589 LASSERT(cfs_list_empty(&rpc_list));
2590 loi_list_maint(cli, loi);
2591 RETURN(PTR_ERR(req));
2594 aa = ptlrpc_req_async_args(req);
2596 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2597 if (cmd == OBD_BRW_READ) {
2598 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2599 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2600 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2601 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2603 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2604 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2605 cli->cl_w_in_flight);
2606 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2607 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2610 client_obd_list_lock(&cli->cl_loi_list_lock);
2612 if (cmd == OBD_BRW_READ)
2613 cli->cl_r_in_flight++;
2615 cli->cl_w_in_flight++;
2617 /* queued sync pages can be torn down while the pages
2618 * were between the pending list and the rpc */
2620 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2621 /* only one oap gets a request reference */
2624 if (oap->oap_interrupted && !req->rq_intr) {
2625 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2627 ptlrpc_mark_interrupted(req);
2631 tmp->oap_request = ptlrpc_request_addref(req);
2633 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2634 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2636 req->rq_interpret_reply = brw_interpret;
2638 /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2639 * CPU/NUMA node the majority of pages were allocated on, and try
2640 * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2641 * to reduce cross-CPU memory traffic.
2643 * But on the other hand, we expect that multiple ptlrpcd threads
2644 * and the initial write sponsor can run in parallel, especially
2645 * when data checksum is enabled, which is CPU-bound operation and
2646 * single ptlrpcd thread cannot process in time. So more ptlrpcd
2647 * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2649 ptlrpcd_add_req(req, pol, -1);
2653 #define LOI_DEBUG(LOI, STR, args...) \
2654 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2655 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2656 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2657 (LOI)->loi_write_lop.lop_num_pending, \
2658 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2659 (LOI)->loi_read_lop.lop_num_pending, \
2660 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2663 /* This is called by osc_check_rpcs() to find which objects have pages that
2664 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2665 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2669 /* First return objects that have blocked locks so that they
2670 * will be flushed quickly and other clients can get the lock,
2671 * then objects which have pages ready to be stuffed into RPCs */
2672 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2673 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2674 struct lov_oinfo, loi_hp_ready_item));
2675 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2676 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2677 struct lov_oinfo, loi_ready_item));
2679 /* then if we have cache waiters, return all objects with queued
2680 * writes. This is especially important when many small files
2681 * have filled up the cache and not been fired into rpcs because
2682 * they don't pass the nr_pending/object threshhold */
2683 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2684 !cfs_list_empty(&cli->cl_loi_write_list))
2685 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2686 struct lov_oinfo, loi_write_item));
2688 /* then return all queued objects when we have an invalid import
2689 * so that they get flushed */
2690 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2691 if (!cfs_list_empty(&cli->cl_loi_write_list))
2692 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2695 if (!cfs_list_empty(&cli->cl_loi_read_list))
2696 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2697 struct lov_oinfo, loi_read_item));
2702 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2704 struct osc_async_page *oap;
2707 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2708 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2709 struct osc_async_page, oap_urgent_item);
2710 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2713 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2714 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2715 struct osc_async_page, oap_urgent_item);
2716 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2719 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2722 /* called with the loi list lock held */
2723 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2725 struct lov_oinfo *loi;
2726 int rc = 0, race_counter = 0;
2730 pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2732 while ((loi = osc_next_loi(cli)) != NULL) {
2733 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2735 if (osc_max_rpc_in_flight(cli, loi))
2738 /* attempt some read/write balancing by alternating between
2739 * reads and writes in an object. The makes_rpc checks here
2740 * would be redundant if we were getting read/write work items
2741 * instead of objects. we don't want send_oap_rpc to drain a
2742 * partial read pending queue when we're given this object to
2743 * do io on writes while there are cache waiters */
2744 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2745 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2746 &loi->loi_write_lop, pol);
2748 CERROR("Write request failed with %d\n", rc);
2750 /* osc_send_oap_rpc failed, mostly because of
2753 * It can't break here, because if:
2754 * - a page was submitted by osc_io_submit, so
2756 * - no request in flight
2757 * - no subsequent request
2758 * The system will be in live-lock state,
2759 * because there is no chance to call
2760 * osc_io_unplug() and osc_check_rpcs() any
2761 * more. pdflush can't help in this case,
2762 * because it might be blocked at grabbing
2763 * the page lock as we mentioned.
2765 * Anyway, continue to drain pages. */
2774 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2775 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2776 &loi->loi_read_lop, pol);
2778 CERROR("Read request failed with %d\n", rc);
2786 /* attempt some inter-object balancing by issuing rpcs
2787 * for each object in turn */
2788 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2789 cfs_list_del_init(&loi->loi_hp_ready_item);
2790 if (!cfs_list_empty(&loi->loi_ready_item))
2791 cfs_list_del_init(&loi->loi_ready_item);
2792 if (!cfs_list_empty(&loi->loi_write_item))
2793 cfs_list_del_init(&loi->loi_write_item);
2794 if (!cfs_list_empty(&loi->loi_read_item))
2795 cfs_list_del_init(&loi->loi_read_item);
2797 loi_list_maint(cli, loi);
2799 /* send_oap_rpc fails with 0 when make_ready tells it to
2800 * back off. llite's make_ready does this when it tries
2801 * to lock a page queued for write that is already locked.
2802 * we want to try sending rpcs from many objects, but we
2803 * don't want to spin failing with 0. */
2804 if (race_counter == 10)
2809 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2811 osc_check_rpcs0(env, cli, 0);
2814 /* we're trying to queue a page in the osc so we're subject to the
2815 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2816 * If the osc's queued pages are already at that limit, then we want to sleep
2817 * until there is space in the osc's queue for us. We also may be waiting for
2818 * write credits from the OST if there are RPCs in flight that may return some
2819 * before we fall back to sync writes.
2821 * We need this know our allocation was granted in the presence of signals */
2822 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2826 client_obd_list_lock(&cli->cl_loi_list_lock);
2827 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2828 client_obd_list_unlock(&cli->cl_loi_list_lock);
2833 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2836 int osc_enter_cache_try(const struct lu_env *env,
2837 struct client_obd *cli, struct lov_oinfo *loi,
2838 struct osc_async_page *oap, int transient)
2842 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2844 osc_consume_write_grant(cli, &oap->oap_brw_page);
2846 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2847 cfs_atomic_inc(&obd_dirty_transit_pages);
2848 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2854 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2855 * grant or cache space. */
2856 static int osc_enter_cache(const struct lu_env *env,
2857 struct client_obd *cli, struct lov_oinfo *loi,
2858 struct osc_async_page *oap)
2860 struct osc_cache_waiter ocw;
2861 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2865 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2866 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2867 cli->cl_dirty_max, obd_max_dirty_pages,
2868 cli->cl_lost_grant, cli->cl_avail_grant);
2870 /* force the caller to try sync io. this can jump the list
2871 * of queued writes and create a discontiguous rpc stream */
2872 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2873 cli->cl_dirty_max < CFS_PAGE_SIZE ||
2874 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2877 /* Hopefully normal case - cache space and write credits available */
2878 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2879 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2880 osc_enter_cache_try(env, cli, loi, oap, 0))
2883 /* It is safe to block as a cache waiter as long as there is grant
2884 * space available or the hope of additional grant being returned
2885 * when an in flight write completes. Using the write back cache
2886 * if possible is preferable to sending the data synchronously
2887 * because write pages can then be merged in to large requests.
2888 * The addition of this cache waiter will causing pending write
2889 * pages to be sent immediately. */
2890 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2891 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2892 cfs_waitq_init(&ocw.ocw_waitq);
2896 loi_list_maint(cli, loi);
2897 osc_check_rpcs(env, cli);
2898 client_obd_list_unlock(&cli->cl_loi_list_lock);
2900 CDEBUG(D_CACHE, "sleeping for cache space\n");
2901 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2903 client_obd_list_lock(&cli->cl_loi_list_lock);
2904 if (!cfs_list_empty(&ocw.ocw_entry)) {
2905 cfs_list_del(&ocw.ocw_entry);
2915 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2916 struct lov_oinfo *loi, cfs_page_t *page,
2917 obd_off offset, const struct obd_async_page_ops *ops,
2918 void *data, void **res, int nocache,
2919 struct lustre_handle *lockh)
2921 struct osc_async_page *oap;
2926 return cfs_size_round(sizeof(*oap));
2929 oap->oap_magic = OAP_MAGIC;
2930 oap->oap_cli = &exp->exp_obd->u.cli;
2933 oap->oap_caller_ops = ops;
2934 oap->oap_caller_data = data;
2936 oap->oap_page = page;
2937 oap->oap_obj_off = offset;
2938 if (!client_is_remote(exp) &&
2939 cfs_capable(CFS_CAP_SYS_RESOURCE))
2940 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2942 LASSERT(!(offset & ~CFS_PAGE_MASK));
2944 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2945 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2946 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2947 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2949 cfs_spin_lock_init(&oap->oap_lock);
2950 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2954 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2955 struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2956 struct osc_async_page *oap, int cmd, int off,
2957 int count, obd_flag brw_flags, enum async_flags async_flags)
2959 struct client_obd *cli = &exp->exp_obd->u.cli;
2963 if (oap->oap_magic != OAP_MAGIC)
2966 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2969 if (!cfs_list_empty(&oap->oap_pending_item) ||
2970 !cfs_list_empty(&oap->oap_urgent_item) ||
2971 !cfs_list_empty(&oap->oap_rpc_item))
2974 /* check if the file's owner/group is over quota */
2975 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2976 struct cl_object *obj;
2977 struct cl_attr attr; /* XXX put attr into thread info */
2978 unsigned int qid[MAXQUOTAS];
2980 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2982 cl_object_attr_lock(obj);
2983 rc = cl_object_attr_get(env, obj, &attr);
2984 cl_object_attr_unlock(obj);
2986 qid[USRQUOTA] = attr.cat_uid;
2987 qid[GRPQUOTA] = attr.cat_gid;
2989 osc_quota_chkdq(cli, qid) == NO_QUOTA)
2996 loi = lsm->lsm_oinfo[0];
2998 client_obd_list_lock(&cli->cl_loi_list_lock);
3000 LASSERT(off + count <= CFS_PAGE_SIZE);
3002 oap->oap_page_off = off;
3003 oap->oap_count = count;
3004 oap->oap_brw_flags = brw_flags;
3005 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3006 if (cfs_memory_pressure_get())
3007 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3008 cfs_spin_lock(&oap->oap_lock);
3009 oap->oap_async_flags = async_flags;
3010 cfs_spin_unlock(&oap->oap_lock);
3012 if (cmd & OBD_BRW_WRITE) {
3013 rc = osc_enter_cache(env, cli, loi, oap);
3015 client_obd_list_unlock(&cli->cl_loi_list_lock);
3020 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3023 osc_oap_to_pending(oap);
3024 loi_list_maint(cli, loi);
3025 if (!osc_max_rpc_in_flight(cli, loi) &&
3026 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3027 LASSERT(cli->cl_writeback_work != NULL);
3028 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3030 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3033 client_obd_list_unlock(&cli->cl_loi_list_lock);
3038 /* aka (~was & now & flag), but this is more clear :) */
3039 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3041 int osc_set_async_flags_base(struct client_obd *cli,
3042 struct lov_oinfo *loi, struct osc_async_page *oap,
3043 obd_flag async_flags)
3045 struct loi_oap_pages *lop;
3049 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3051 if (oap->oap_cmd & OBD_BRW_WRITE) {
3052 lop = &loi->loi_write_lop;
3054 lop = &loi->loi_read_lop;
3057 if ((oap->oap_async_flags & async_flags) == async_flags)
3060 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3061 flags |= ASYNC_READY;
3063 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3064 cfs_list_empty(&oap->oap_rpc_item)) {
3065 if (oap->oap_async_flags & ASYNC_HP)
3066 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3068 cfs_list_add_tail(&oap->oap_urgent_item,
3070 flags |= ASYNC_URGENT;
3071 loi_list_maint(cli, loi);
3073 cfs_spin_lock(&oap->oap_lock);
3074 oap->oap_async_flags |= flags;
3075 cfs_spin_unlock(&oap->oap_lock);
3077 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3078 oap->oap_async_flags);
3082 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3083 struct lov_oinfo *loi, struct osc_async_page *oap)
3085 struct client_obd *cli = &exp->exp_obd->u.cli;
3086 struct loi_oap_pages *lop;
3090 if (oap->oap_magic != OAP_MAGIC)
3094 loi = lsm->lsm_oinfo[0];
3096 if (oap->oap_cmd & OBD_BRW_WRITE) {
3097 lop = &loi->loi_write_lop;
3099 lop = &loi->loi_read_lop;
3102 client_obd_list_lock(&cli->cl_loi_list_lock);
3104 if (!cfs_list_empty(&oap->oap_rpc_item))
3105 GOTO(out, rc = -EBUSY);
3107 osc_exit_cache(cli, oap, 0);
3108 osc_wake_cache_waiters(cli);
3110 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3111 cfs_list_del_init(&oap->oap_urgent_item);
3112 cfs_spin_lock(&oap->oap_lock);
3113 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3114 cfs_spin_unlock(&oap->oap_lock);
3116 if (!cfs_list_empty(&oap->oap_pending_item)) {
3117 cfs_list_del_init(&oap->oap_pending_item);
3118 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3120 loi_list_maint(cli, loi);
3121 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3123 client_obd_list_unlock(&cli->cl_loi_list_lock);
3127 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3128 struct ldlm_enqueue_info *einfo)
3130 void *data = einfo->ei_cbdata;
3133 LASSERT(lock != NULL);
3134 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3135 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3136 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3137 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3139 lock_res_and_lock(lock);
3140 cfs_spin_lock(&osc_ast_guard);
3142 if (lock->l_ast_data == NULL)
3143 lock->l_ast_data = data;
3144 if (lock->l_ast_data == data)
3147 cfs_spin_unlock(&osc_ast_guard);
3148 unlock_res_and_lock(lock);
3153 static int osc_set_data_with_check(struct lustre_handle *lockh,
3154 struct ldlm_enqueue_info *einfo)
3156 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3160 set = osc_set_lock_data_with_check(lock, einfo);
3161 LDLM_LOCK_PUT(lock);
3163 CERROR("lockh %p, data %p - client evicted?\n",
3164 lockh, einfo->ei_cbdata);
3168 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3169 ldlm_iterator_t replace, void *data)
3171 struct ldlm_res_id res_id;
3172 struct obd_device *obd = class_exp2obd(exp);
3174 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3175 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3179 /* find any ldlm lock of the inode in osc
3183 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3184 ldlm_iterator_t replace, void *data)
3186 struct ldlm_res_id res_id;
3187 struct obd_device *obd = class_exp2obd(exp);
3190 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3191 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3192 if (rc == LDLM_ITER_STOP)
3194 if (rc == LDLM_ITER_CONTINUE)
3199 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3200 obd_enqueue_update_f upcall, void *cookie,
3201 int *flags, int agl, int rc)
3203 int intent = *flags & LDLM_FL_HAS_INTENT;
3207 /* The request was created before ldlm_cli_enqueue call. */
3208 if (rc == ELDLM_LOCK_ABORTED) {
3209 struct ldlm_reply *rep;
3210 rep = req_capsule_server_get(&req->rq_pill,
3213 LASSERT(rep != NULL);
3214 if (rep->lock_policy_res1)
3215 rc = rep->lock_policy_res1;
3219 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3221 *flags |= LDLM_FL_LVB_READY;
3222 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3223 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3226 /* Call the update callback. */
3227 rc = (*upcall)(cookie, rc);
3231 static int osc_enqueue_interpret(const struct lu_env *env,
3232 struct ptlrpc_request *req,
3233 struct osc_enqueue_args *aa, int rc)
3235 struct ldlm_lock *lock;
3236 struct lustre_handle handle;
3238 struct ost_lvb *lvb;
3240 int *flags = aa->oa_flags;
3242 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3243 * might be freed anytime after lock upcall has been called. */
3244 lustre_handle_copy(&handle, aa->oa_lockh);
3245 mode = aa->oa_ei->ei_mode;
3247 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3249 lock = ldlm_handle2lock(&handle);
3251 /* Take an additional reference so that a blocking AST that
3252 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3253 * to arrive after an upcall has been executed by
3254 * osc_enqueue_fini(). */
3255 ldlm_lock_addref(&handle, mode);
3257 /* Let CP AST to grant the lock first. */
3258 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3260 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3265 lvb_len = sizeof(*aa->oa_lvb);
3268 /* Complete obtaining the lock procedure. */
3269 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3270 mode, flags, lvb, lvb_len, &handle, rc);
3271 /* Complete osc stuff. */
3272 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3273 flags, aa->oa_agl, rc);
3275 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3277 /* Release the lock for async request. */
3278 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3280 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3281 * not already released by
3282 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3284 ldlm_lock_decref(&handle, mode);
3286 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3287 aa->oa_lockh, req, aa);
3288 ldlm_lock_decref(&handle, mode);
3289 LDLM_LOCK_PUT(lock);
3293 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3294 struct lov_oinfo *loi, int flags,
3295 struct ost_lvb *lvb, __u32 mode, int rc)
3297 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3299 if (rc == ELDLM_OK) {
3302 LASSERT(lock != NULL);
3303 loi->loi_lvb = *lvb;
3304 tmp = loi->loi_lvb.lvb_size;
3305 /* Extend KMS up to the end of this lock and no further
3306 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3307 if (tmp > lock->l_policy_data.l_extent.end)
3308 tmp = lock->l_policy_data.l_extent.end + 1;
3309 if (tmp >= loi->loi_kms) {
3310 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3311 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3312 loi_kms_set(loi, tmp);
3314 LDLM_DEBUG(lock, "lock acquired, setting rss="
3315 LPU64"; leaving kms="LPU64", end="LPU64,
3316 loi->loi_lvb.lvb_size, loi->loi_kms,
3317 lock->l_policy_data.l_extent.end);
3319 ldlm_lock_allow_match(lock);
3320 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3321 LASSERT(lock != NULL);
3322 loi->loi_lvb = *lvb;
3323 ldlm_lock_allow_match(lock);
3324 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3325 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3331 ldlm_lock_fail_match(lock, rc);
3333 LDLM_LOCK_PUT(lock);
3336 EXPORT_SYMBOL(osc_update_enqueue);
3338 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3340 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3341 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3342 * other synchronous requests, however keeping some locks and trying to obtain
3343 * others may take a considerable amount of time in a case of ost failure; and
3344 * when other sync requests do not get released lock from a client, the client
3345 * is excluded from the cluster -- such scenarious make the life difficult, so
3346 * release locks just after they are obtained. */
3347 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3348 int *flags, ldlm_policy_data_t *policy,
3349 struct ost_lvb *lvb, int kms_valid,
3350 obd_enqueue_update_f upcall, void *cookie,
3351 struct ldlm_enqueue_info *einfo,
3352 struct lustre_handle *lockh,
3353 struct ptlrpc_request_set *rqset, int async, int agl)
3355 struct obd_device *obd = exp->exp_obd;
3356 struct ptlrpc_request *req = NULL;
3357 int intent = *flags & LDLM_FL_HAS_INTENT;
3358 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3363 /* Filesystem lock extents are extended to page boundaries so that
3364 * dealing with the page cache is a little smoother. */
3365 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3366 policy->l_extent.end |= ~CFS_PAGE_MASK;
3369 * kms is not valid when either object is completely fresh (so that no
3370 * locks are cached), or object was evicted. In the latter case cached
3371 * lock cannot be used, because it would prime inode state with
3372 * potentially stale LVB.
3377 /* Next, search for already existing extent locks that will cover us */
3378 /* If we're trying to read, we also search for an existing PW lock. The
3379 * VFS and page cache already protect us locally, so lots of readers/
3380 * writers can share a single PW lock.
3382 * There are problems with conversion deadlocks, so instead of
3383 * converting a read lock to a write lock, we'll just enqueue a new
3386 * At some point we should cancel the read lock instead of making them
3387 * send us a blocking callback, but there are problems with canceling
3388 * locks out from other users right now, too. */
3389 mode = einfo->ei_mode;
3390 if (einfo->ei_mode == LCK_PR)
3392 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3393 einfo->ei_type, policy, mode, lockh, 0);
3395 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3397 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3398 /* For AGL, if enqueue RPC is sent but the lock is not
3399 * granted, then skip to process this strpe.
3400 * Return -ECANCELED to tell the caller. */
3401 ldlm_lock_decref(lockh, mode);
3402 LDLM_LOCK_PUT(matched);
3404 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3405 *flags |= LDLM_FL_LVB_READY;
3406 /* addref the lock only if not async requests and PW
3407 * lock is matched whereas we asked for PR. */
3408 if (!rqset && einfo->ei_mode != mode)
3409 ldlm_lock_addref(lockh, LCK_PR);
3411 /* I would like to be able to ASSERT here that
3412 * rss <= kms, but I can't, for reasons which
3413 * are explained in lov_enqueue() */
3416 /* We already have a lock, and it's referenced */
3417 (*upcall)(cookie, ELDLM_OK);
3419 if (einfo->ei_mode != mode)
3420 ldlm_lock_decref(lockh, LCK_PW);
3422 /* For async requests, decref the lock. */
3423 ldlm_lock_decref(lockh, einfo->ei_mode);
3424 LDLM_LOCK_PUT(matched);
3427 ldlm_lock_decref(lockh, mode);
3428 LDLM_LOCK_PUT(matched);
3434 CFS_LIST_HEAD(cancels);
3435 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3436 &RQF_LDLM_ENQUEUE_LVB);
3440 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3442 ptlrpc_request_free(req);
3446 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3448 ptlrpc_request_set_replen(req);
3451 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3452 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3454 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3455 sizeof(*lvb), lockh, async);
3458 struct osc_enqueue_args *aa;
3459 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3460 aa = ptlrpc_req_async_args(req);
3463 aa->oa_flags = flags;
3464 aa->oa_upcall = upcall;
3465 aa->oa_cookie = cookie;
3467 aa->oa_lockh = lockh;
3470 req->rq_interpret_reply =
3471 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3472 if (rqset == PTLRPCD_SET)
3473 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3475 ptlrpc_set_add_req(rqset, req);
3476 } else if (intent) {
3477 ptlrpc_req_finished(req);
3482 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3484 ptlrpc_req_finished(req);
3489 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3490 struct ldlm_enqueue_info *einfo,
3491 struct ptlrpc_request_set *rqset)
3493 struct ldlm_res_id res_id;
3497 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3498 oinfo->oi_md->lsm_object_seq, &res_id);
3500 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3501 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3502 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3503 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3504 rqset, rqset != NULL, 0);
3508 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3509 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3510 int *flags, void *data, struct lustre_handle *lockh,
3513 struct obd_device *obd = exp->exp_obd;
3514 int lflags = *flags;
3518 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3521 /* Filesystem lock extents are extended to page boundaries so that
3522 * dealing with the page cache is a little smoother */
3523 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3524 policy->l_extent.end |= ~CFS_PAGE_MASK;
3526 /* Next, search for already existing extent locks that will cover us */
3527 /* If we're trying to read, we also search for an existing PW lock. The
3528 * VFS and page cache already protect us locally, so lots of readers/
3529 * writers can share a single PW lock. */
3533 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3534 res_id, type, policy, rc, lockh, unref);
3537 if (!osc_set_data_with_check(lockh, data)) {
3538 if (!(lflags & LDLM_FL_TEST_LOCK))
3539 ldlm_lock_decref(lockh, rc);
3543 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3544 ldlm_lock_addref(lockh, LCK_PR);
3545 ldlm_lock_decref(lockh, LCK_PW);
3552 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3556 if (unlikely(mode == LCK_GROUP))
3557 ldlm_lock_decref_and_cancel(lockh, mode);
3559 ldlm_lock_decref(lockh, mode);
3564 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3565 __u32 mode, struct lustre_handle *lockh)
3568 RETURN(osc_cancel_base(lockh, mode));
3571 static int osc_cancel_unused(struct obd_export *exp,
3572 struct lov_stripe_md *lsm,
3573 ldlm_cancel_flags_t flags,
3576 struct obd_device *obd = class_exp2obd(exp);
3577 struct ldlm_res_id res_id, *resp = NULL;
3580 resp = osc_build_res_name(lsm->lsm_object_id,
3581 lsm->lsm_object_seq, &res_id);
3584 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3587 static int osc_statfs_interpret(const struct lu_env *env,
3588 struct ptlrpc_request *req,
3589 struct osc_async_args *aa, int rc)
3591 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3592 struct obd_statfs *msfs;
3597 /* The request has in fact never been sent
3598 * due to issues at a higher level (LOV).
3599 * Exit immediately since the caller is
3600 * aware of the problem and takes care
3601 * of the clean up */
3604 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3605 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3611 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3613 GOTO(out, rc = -EPROTO);
3616 /* Reinitialize the RDONLY and DEGRADED flags at the client
3617 * on each statfs, so they don't stay set permanently. */
3618 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3620 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3621 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3622 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3623 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3625 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3626 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3627 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3628 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3630 /* Add a bit of hysteresis so this flag isn't continually flapping,
3631 * and ensure that new files don't get extremely fragmented due to
3632 * only a small amount of available space in the filesystem.
3633 * We want to set the NOSPC flag when there is less than ~0.1% free
3634 * and clear it when there is at least ~0.2% free space, so:
3635 * avail < ~0.1% max max = avail + used
3636 * 1025 * avail < avail + used used = blocks - free
3637 * 1024 * avail < used
3638 * 1024 * avail < blocks - free
3639 * avail < ((blocks - free) >> 10)
3641 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3642 * lose that amount of space so in those cases we report no space left
3643 * if their is less than 1 GB left. */
3644 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3645 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3646 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3647 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3648 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3649 (msfs->os_ffree > 64) &&
3650 (msfs->os_bavail > (used << 1)))) {
3651 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3652 OSCC_FLAG_NOSPC_BLK);
3655 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3656 (msfs->os_bavail < used)))
3657 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3659 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3661 *aa->aa_oi->oi_osfs = *msfs;
3663 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3667 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3668 __u64 max_age, struct ptlrpc_request_set *rqset)
3670 struct ptlrpc_request *req;
3671 struct osc_async_args *aa;
3675 /* We could possibly pass max_age in the request (as an absolute
3676 * timestamp or a "seconds.usec ago") so the target can avoid doing
3677 * extra calls into the filesystem if that isn't necessary (e.g.
3678 * during mount that would help a bit). Having relative timestamps
3679 * is not so great if request processing is slow, while absolute
3680 * timestamps are not ideal because they need time synchronization. */
3681 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3685 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3687 ptlrpc_request_free(req);
3690 ptlrpc_request_set_replen(req);
3691 req->rq_request_portal = OST_CREATE_PORTAL;
3692 ptlrpc_at_set_req_timeout(req);
3694 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3695 /* procfs requests not want stat in wait for avoid deadlock */
3696 req->rq_no_resend = 1;
3697 req->rq_no_delay = 1;
3700 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3701 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3702 aa = ptlrpc_req_async_args(req);
3705 ptlrpc_set_add_req(rqset, req);
3709 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3710 __u64 max_age, __u32 flags)
3712 struct obd_statfs *msfs;
3713 struct ptlrpc_request *req;
3714 struct obd_import *imp = NULL;
3718 /*Since the request might also come from lprocfs, so we need
3719 *sync this with client_disconnect_export Bug15684*/
3720 cfs_down_read(&obd->u.cli.cl_sem);
3721 if (obd->u.cli.cl_import)
3722 imp = class_import_get(obd->u.cli.cl_import);
3723 cfs_up_read(&obd->u.cli.cl_sem);
3727 /* We could possibly pass max_age in the request (as an absolute
3728 * timestamp or a "seconds.usec ago") so the target can avoid doing
3729 * extra calls into the filesystem if that isn't necessary (e.g.
3730 * during mount that would help a bit). Having relative timestamps
3731 * is not so great if request processing is slow, while absolute
3732 * timestamps are not ideal because they need time synchronization. */
3733 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3735 class_import_put(imp);
3740 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3742 ptlrpc_request_free(req);
3745 ptlrpc_request_set_replen(req);
3746 req->rq_request_portal = OST_CREATE_PORTAL;
3747 ptlrpc_at_set_req_timeout(req);
3749 if (flags & OBD_STATFS_NODELAY) {
3750 /* procfs requests not want stat in wait for avoid deadlock */
3751 req->rq_no_resend = 1;
3752 req->rq_no_delay = 1;
3755 rc = ptlrpc_queue_wait(req);
3759 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3761 GOTO(out, rc = -EPROTO);
3768 ptlrpc_req_finished(req);
3772 /* Retrieve object striping information.
3774 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3775 * the maximum number of OST indices which will fit in the user buffer.
3776 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3778 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3780 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3781 struct lov_user_md_v3 lum, *lumk;
3782 struct lov_user_ost_data_v1 *lmm_objects;
3783 int rc = 0, lum_size;
3789 /* we only need the header part from user space to get lmm_magic and
3790 * lmm_stripe_count, (the header part is common to v1 and v3) */
3791 lum_size = sizeof(struct lov_user_md_v1);
3792 if (cfs_copy_from_user(&lum, lump, lum_size))
3795 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3796 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3799 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3800 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3801 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3802 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3804 /* we can use lov_mds_md_size() to compute lum_size
3805 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3806 if (lum.lmm_stripe_count > 0) {
3807 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3808 OBD_ALLOC(lumk, lum_size);
3812 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3813 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3815 lmm_objects = &(lumk->lmm_objects[0]);
3816 lmm_objects->l_object_id = lsm->lsm_object_id;
3818 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3822 lumk->lmm_object_id = lsm->lsm_object_id;
3823 lumk->lmm_object_seq = lsm->lsm_object_seq;
3824 lumk->lmm_stripe_count = 1;
3826 if (cfs_copy_to_user(lump, lumk, lum_size))
3830 OBD_FREE(lumk, lum_size);
3836 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3837 void *karg, void *uarg)
3839 struct obd_device *obd = exp->exp_obd;
3840 struct obd_ioctl_data *data = karg;
3844 if (!cfs_try_module_get(THIS_MODULE)) {
3845 CERROR("Can't get module. Is it alive?");
3849 case OBD_IOC_LOV_GET_CONFIG: {
3851 struct lov_desc *desc;
3852 struct obd_uuid uuid;
3856 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3857 GOTO(out, err = -EINVAL);
3859 data = (struct obd_ioctl_data *)buf;
3861 if (sizeof(*desc) > data->ioc_inllen1) {
3862 obd_ioctl_freedata(buf, len);
3863 GOTO(out, err = -EINVAL);
3866 if (data->ioc_inllen2 < sizeof(uuid)) {
3867 obd_ioctl_freedata(buf, len);
3868 GOTO(out, err = -EINVAL);
3871 desc = (struct lov_desc *)data->ioc_inlbuf1;
3872 desc->ld_tgt_count = 1;
3873 desc->ld_active_tgt_count = 1;
3874 desc->ld_default_stripe_count = 1;
3875 desc->ld_default_stripe_size = 0;
3876 desc->ld_default_stripe_offset = 0;
3877 desc->ld_pattern = 0;
3878 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3880 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3882 err = cfs_copy_to_user((void *)uarg, buf, len);
3885 obd_ioctl_freedata(buf, len);
3888 case LL_IOC_LOV_SETSTRIPE:
3889 err = obd_alloc_memmd(exp, karg);
3893 case LL_IOC_LOV_GETSTRIPE:
3894 err = osc_getstripe(karg, uarg);
3896 case OBD_IOC_CLIENT_RECOVER:
3897 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3898 data->ioc_inlbuf1, 0);
3902 case IOC_OSC_SET_ACTIVE:
3903 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3906 case OBD_IOC_POLL_QUOTACHECK:
3907 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3909 case OBD_IOC_PING_TARGET:
3910 err = ptlrpc_obd_ping(obd);
3913 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3914 cmd, cfs_curproc_comm());
3915 GOTO(out, err = -ENOTTY);
3918 cfs_module_put(THIS_MODULE);
3922 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3923 void *key, __u32 *vallen, void *val,
3924 struct lov_stripe_md *lsm)
3927 if (!vallen || !val)
3930 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3931 __u32 *stripe = val;
3932 *vallen = sizeof(*stripe);
3935 } else if (KEY_IS(KEY_LAST_ID)) {
3936 struct ptlrpc_request *req;
3941 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3942 &RQF_OST_GET_INFO_LAST_ID);
3946 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3947 RCL_CLIENT, keylen);
3948 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3950 ptlrpc_request_free(req);
3954 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3955 memcpy(tmp, key, keylen);
3957 req->rq_no_delay = req->rq_no_resend = 1;
3958 ptlrpc_request_set_replen(req);
3959 rc = ptlrpc_queue_wait(req);
3963 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3965 GOTO(out, rc = -EPROTO);
3967 *((obd_id *)val) = *reply;
3969 ptlrpc_req_finished(req);
3971 } else if (KEY_IS(KEY_FIEMAP)) {
3972 struct ptlrpc_request *req;
3973 struct ll_user_fiemap *reply;
3977 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3978 &RQF_OST_GET_INFO_FIEMAP);
3982 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3983 RCL_CLIENT, keylen);
3984 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3985 RCL_CLIENT, *vallen);
3986 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3987 RCL_SERVER, *vallen);
3989 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3991 ptlrpc_request_free(req);
3995 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3996 memcpy(tmp, key, keylen);
3997 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3998 memcpy(tmp, val, *vallen);
4000 ptlrpc_request_set_replen(req);
4001 rc = ptlrpc_queue_wait(req);
4005 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4007 GOTO(out1, rc = -EPROTO);
4009 memcpy(val, reply, *vallen);
4011 ptlrpc_req_finished(req);
4019 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4021 struct llog_ctxt *ctxt;
4025 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4027 rc = llog_initiator_connect(ctxt);
4028 llog_ctxt_put(ctxt);
4030 /* XXX return an error? skip setting below flags? */
4033 cfs_spin_lock(&imp->imp_lock);
4034 imp->imp_server_timeout = 1;
4035 imp->imp_pingable = 1;
4036 cfs_spin_unlock(&imp->imp_lock);
4037 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4042 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4043 struct ptlrpc_request *req,
4050 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4053 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4054 void *key, obd_count vallen, void *val,
4055 struct ptlrpc_request_set *set)
4057 struct ptlrpc_request *req;
4058 struct obd_device *obd = exp->exp_obd;
4059 struct obd_import *imp = class_exp2cliimp(exp);
4064 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4066 if (KEY_IS(KEY_NEXT_ID)) {
4068 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4070 if (vallen != sizeof(obd_id))
4075 if (vallen != sizeof(obd_id))
4078 /* avoid race between allocate new object and set next id
4079 * from ll_sync thread */
4080 cfs_spin_lock(&oscc->oscc_lock);
4081 new_val = *((obd_id*)val) + 1;
4082 if (new_val > oscc->oscc_next_id)
4083 oscc->oscc_next_id = new_val;
4084 cfs_spin_unlock(&oscc->oscc_lock);
4085 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4086 exp->exp_obd->obd_name,
4087 obd->u.cli.cl_oscc.oscc_next_id);
4092 if (KEY_IS(KEY_CHECKSUM)) {
4093 if (vallen != sizeof(int))
4095 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4099 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4100 sptlrpc_conf_client_adapt(obd);
4104 if (KEY_IS(KEY_FLUSH_CTX)) {
4105 sptlrpc_import_flush_my_ctx(imp);
4109 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4112 /* We pass all other commands directly to OST. Since nobody calls osc
4113 methods directly and everybody is supposed to go through LOV, we
4114 assume lov checked invalid values for us.
4115 The only recognised values so far are evict_by_nid and mds_conn.
4116 Even if something bad goes through, we'd get a -EINVAL from OST
4119 if (KEY_IS(KEY_GRANT_SHRINK))
4120 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4122 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4127 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4128 RCL_CLIENT, keylen);
4129 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4130 RCL_CLIENT, vallen);
4131 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4133 ptlrpc_request_free(req);
4137 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4138 memcpy(tmp, key, keylen);
4139 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4140 memcpy(tmp, val, vallen);
4142 if (KEY_IS(KEY_MDS_CONN)) {
4143 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4145 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4146 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4147 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4148 req->rq_no_delay = req->rq_no_resend = 1;
4149 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4150 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4151 struct osc_grant_args *aa;
4154 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4155 aa = ptlrpc_req_async_args(req);
4158 ptlrpc_req_finished(req);
4161 *oa = ((struct ost_body *)val)->oa;
4163 req->rq_interpret_reply = osc_shrink_grant_interpret;
4166 ptlrpc_request_set_replen(req);
4167 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4168 LASSERT(set != NULL);
4169 ptlrpc_set_add_req(set, req);
4170 ptlrpc_check_set(NULL, set);
4172 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4178 static struct llog_operations osc_size_repl_logops = {
4179 lop_cancel: llog_obd_repl_cancel
4182 static struct llog_operations osc_mds_ost_orig_logops;
4184 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4185 struct obd_device *tgt, struct llog_catid *catid)
4190 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4191 &catid->lci_logid, &osc_mds_ost_orig_logops);
4193 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4197 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4198 NULL, &osc_size_repl_logops);
4200 struct llog_ctxt *ctxt =
4201 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4204 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4209 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4210 obd->obd_name, tgt->obd_name, catid, rc);
4211 CERROR("logid "LPX64":0x%x\n",
4212 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4217 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4218 struct obd_device *disk_obd, int *index)
4220 struct llog_catid catid;
4221 static char name[32] = CATLIST;
4225 LASSERT(olg == &obd->obd_olg);
4227 cfs_mutex_down(&olg->olg_cat_processing);
4228 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4230 CERROR("rc: %d\n", rc);
4234 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4235 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4236 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4238 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4240 CERROR("rc: %d\n", rc);
4244 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4246 CERROR("rc: %d\n", rc);
4251 cfs_mutex_up(&olg->olg_cat_processing);
4256 static int osc_llog_finish(struct obd_device *obd, int count)
4258 struct llog_ctxt *ctxt;
4259 int rc = 0, rc2 = 0;
4262 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4264 rc = llog_cleanup(ctxt);
4266 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4268 rc2 = llog_cleanup(ctxt);
4275 static int osc_reconnect(const struct lu_env *env,
4276 struct obd_export *exp, struct obd_device *obd,
4277 struct obd_uuid *cluuid,
4278 struct obd_connect_data *data,
4281 struct client_obd *cli = &obd->u.cli;
4283 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4286 client_obd_list_lock(&cli->cl_loi_list_lock);
4287 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4288 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4289 lost_grant = cli->cl_lost_grant;
4290 cli->cl_lost_grant = 0;
4291 client_obd_list_unlock(&cli->cl_loi_list_lock);
4293 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4294 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4295 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4296 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4297 " ocd_grant: %d\n", data->ocd_connect_flags,
4298 data->ocd_version, data->ocd_grant);
4304 static int osc_disconnect(struct obd_export *exp)
4306 struct obd_device *obd = class_exp2obd(exp);
4307 struct llog_ctxt *ctxt;
4310 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4312 if (obd->u.cli.cl_conn_count == 1) {
4313 /* Flush any remaining cancel messages out to the
4315 llog_sync(ctxt, exp);
4317 llog_ctxt_put(ctxt);
4319 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4323 rc = client_disconnect_export(exp);
4325 * Initially we put del_shrink_grant before disconnect_export, but it
4326 * causes the following problem if setup (connect) and cleanup
4327 * (disconnect) are tangled together.
4328 * connect p1 disconnect p2
4329 * ptlrpc_connect_import
4330 * ............... class_manual_cleanup
4333 * ptlrpc_connect_interrupt
4335 * add this client to shrink list
4337 * Bang! pinger trigger the shrink.
4338 * So the osc should be disconnected from the shrink list, after we
4339 * are sure the import has been destroyed. BUG18662
4341 if (obd->u.cli.cl_import == NULL)
4342 osc_del_shrink_grant(&obd->u.cli);
4346 static int osc_import_event(struct obd_device *obd,
4347 struct obd_import *imp,
4348 enum obd_import_event event)
4350 struct client_obd *cli;
4354 LASSERT(imp->imp_obd == obd);
4357 case IMP_EVENT_DISCON: {
4358 /* Only do this on the MDS OSC's */
4359 if (imp->imp_server_timeout) {
4360 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4362 cfs_spin_lock(&oscc->oscc_lock);
4363 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4364 cfs_spin_unlock(&oscc->oscc_lock);
4367 client_obd_list_lock(&cli->cl_loi_list_lock);
4368 cli->cl_avail_grant = 0;
4369 cli->cl_lost_grant = 0;
4370 client_obd_list_unlock(&cli->cl_loi_list_lock);
4373 case IMP_EVENT_INACTIVE: {
4374 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4377 case IMP_EVENT_INVALIDATE: {
4378 struct ldlm_namespace *ns = obd->obd_namespace;
4382 env = cl_env_get(&refcheck);
4386 client_obd_list_lock(&cli->cl_loi_list_lock);
4387 /* all pages go to failing rpcs due to the invalid
4389 osc_check_rpcs(env, cli);
4390 client_obd_list_unlock(&cli->cl_loi_list_lock);
4392 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4393 cl_env_put(env, &refcheck);
4398 case IMP_EVENT_ACTIVE: {
4399 /* Only do this on the MDS OSC's */
4400 if (imp->imp_server_timeout) {
4401 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4403 cfs_spin_lock(&oscc->oscc_lock);
4404 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4405 OSCC_FLAG_NOSPC_BLK);
4406 cfs_spin_unlock(&oscc->oscc_lock);
4408 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4411 case IMP_EVENT_OCD: {
4412 struct obd_connect_data *ocd = &imp->imp_connect_data;
4414 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4415 osc_init_grant(&obd->u.cli, ocd);
4418 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4419 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4421 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4424 case IMP_EVENT_DEACTIVATE: {
4425 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4428 case IMP_EVENT_ACTIVATE: {
4429 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4433 CERROR("Unknown import event %d\n", event);
4440 * Determine whether the lock can be canceled before replaying the lock
4441 * during recovery, see bug16774 for detailed information.
4443 * \retval zero the lock can't be canceled
4444 * \retval other ok to cancel
4446 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4448 check_res_locked(lock->l_resource);
4451 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4453 * XXX as a future improvement, we can also cancel unused write lock
4454 * if it doesn't have dirty data and active mmaps.
4456 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4457 (lock->l_granted_mode == LCK_PR ||
4458 lock->l_granted_mode == LCK_CR) &&
4459 (osc_dlm_lock_pageref(lock) == 0))
4465 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4467 struct client_obd *cli = &obd->u.cli;
4472 rc = ptlrpcd_addref();
4476 rc = client_obd_setup(obd, lcfg);
4479 handler = ptlrpcd_alloc_work(cli->cl_import,
4480 brw_queue_work, cli);
4481 if (!IS_ERR(handler))
4482 cli->cl_writeback_work = handler;
4484 rc = PTR_ERR(handler);
4488 struct lprocfs_static_vars lvars = { 0 };
4490 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4491 lprocfs_osc_init_vars(&lvars);
4492 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4493 lproc_osc_attach_seqstat(obd);
4494 sptlrpc_lprocfs_cliobd_attach(obd);
4495 ptlrpc_lprocfs_register_obd(obd);
4499 /* We need to allocate a few requests more, because
4500 brw_interpret tries to create new requests before freeing
4501 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4502 reserved, but I afraid that might be too much wasted RAM
4503 in fact, so 2 is just my guess and still should work. */
4504 cli->cl_import->imp_rq_pool =
4505 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4507 ptlrpc_add_rqs_to_pool);
4509 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4510 cfs_sema_init(&cli->cl_grant_sem, 1);
4512 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4520 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4526 case OBD_CLEANUP_EARLY: {
4527 struct obd_import *imp;
4528 imp = obd->u.cli.cl_import;
4529 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4530 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4531 ptlrpc_deactivate_import(imp);
4532 cfs_spin_lock(&imp->imp_lock);
4533 imp->imp_pingable = 0;
4534 cfs_spin_unlock(&imp->imp_lock);
4537 case OBD_CLEANUP_EXPORTS: {
4538 struct client_obd *cli = &obd->u.cli;
4540 * for echo client, export may be on zombie list, wait for
4541 * zombie thread to cull it, because cli.cl_import will be
4542 * cleared in client_disconnect_export():
4543 * class_export_destroy() -> obd_cleanup() ->
4544 * echo_device_free() -> echo_client_cleanup() ->
4545 * obd_disconnect() -> osc_disconnect() ->
4546 * client_disconnect_export()
4548 obd_zombie_barrier();
4549 if (cli->cl_writeback_work) {
4550 ptlrpcd_destroy_work(cli->cl_writeback_work);
4551 cli->cl_writeback_work = NULL;
4553 obd_cleanup_client_import(obd);
4554 ptlrpc_lprocfs_unregister_obd(obd);
4555 lprocfs_obd_cleanup(obd);
4556 rc = obd_llog_finish(obd, 0);
4558 CERROR("failed to cleanup llogging subsystems\n");
4565 int osc_cleanup(struct obd_device *obd)
4571 /* free memory of osc quota cache */
4572 osc_quota_cleanup(obd);
4574 rc = client_obd_cleanup(obd);
4580 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4582 struct lprocfs_static_vars lvars = { 0 };
4585 lprocfs_osc_init_vars(&lvars);
4587 switch (lcfg->lcfg_command) {
4589 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4599 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4601 return osc_process_config_base(obd, buf);
4604 struct obd_ops osc_obd_ops = {
4605 .o_owner = THIS_MODULE,
4606 .o_setup = osc_setup,
4607 .o_precleanup = osc_precleanup,
4608 .o_cleanup = osc_cleanup,
4609 .o_add_conn = client_import_add_conn,
4610 .o_del_conn = client_import_del_conn,
4611 .o_connect = client_connect_import,
4612 .o_reconnect = osc_reconnect,
4613 .o_disconnect = osc_disconnect,
4614 .o_statfs = osc_statfs,
4615 .o_statfs_async = osc_statfs_async,
4616 .o_packmd = osc_packmd,
4617 .o_unpackmd = osc_unpackmd,
4618 .o_precreate = osc_precreate,
4619 .o_create = osc_create,
4620 .o_create_async = osc_create_async,
4621 .o_destroy = osc_destroy,
4622 .o_getattr = osc_getattr,
4623 .o_getattr_async = osc_getattr_async,
4624 .o_setattr = osc_setattr,
4625 .o_setattr_async = osc_setattr_async,
4627 .o_punch = osc_punch,
4629 .o_enqueue = osc_enqueue,
4630 .o_change_cbdata = osc_change_cbdata,
4631 .o_find_cbdata = osc_find_cbdata,
4632 .o_cancel = osc_cancel,
4633 .o_cancel_unused = osc_cancel_unused,
4634 .o_iocontrol = osc_iocontrol,
4635 .o_get_info = osc_get_info,
4636 .o_set_info_async = osc_set_info_async,
4637 .o_import_event = osc_import_event,
4638 .o_llog_init = osc_llog_init,
4639 .o_llog_finish = osc_llog_finish,
4640 .o_process_config = osc_process_config,
4641 .o_quotactl = osc_quotactl,
4642 .o_quotacheck = osc_quotacheck,
4643 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
4646 extern struct lu_kmem_descr osc_caches[];
4647 extern cfs_spinlock_t osc_ast_guard;
4648 extern cfs_lock_class_key_t osc_ast_guard_class;
4650 int __init osc_init(void)
4652 struct lprocfs_static_vars lvars = { 0 };
4656 /* print an address of _any_ initialized kernel symbol from this
4657 * module, to allow debugging with gdb that doesn't support data
4658 * symbols from modules.*/
4659 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4661 rc = lu_kmem_init(osc_caches);
4663 lprocfs_osc_init_vars(&lvars);
4666 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4667 LUSTRE_OSC_NAME, &osc_device_type);
4669 lu_kmem_fini(osc_caches);
4673 cfs_spin_lock_init(&osc_ast_guard);
4674 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4676 osc_mds_ost_orig_logops = llog_lvfs_ops;
4677 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4678 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4679 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4680 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4686 static void /*__exit*/ osc_exit(void)
4688 lu_device_type_fini(&osc_device_type);
4691 class_unregister_type(LUSTRE_OSC_NAME);
4692 lu_kmem_fini(osc_caches);
4695 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4696 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4697 MODULE_LICENSE("GPL");
4699 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);