1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 rc = sa->sa_upcall(sa->sa_cookie, rc);
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 obd_enqueue_update_f upcall, void *cookie,
370 struct ptlrpc_request_set *rqset)
372 struct ptlrpc_request *req;
373 struct osc_setattr_args *sa;
377 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 ptlrpc_request_free(req);
388 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 /* do mds to ost setattr asynchronously */
397 /* Do not wait for response. */
398 ptlrpcd_add_req(req, PSCOPE_OTHER);
400 req->rq_interpret_reply =
401 (ptlrpc_interpterer_t)osc_setattr_interpret;
403 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404 sa = ptlrpc_req_async_args(req);
405 sa->sa_oa = oinfo->oi_oa;
406 sa->sa_upcall = upcall;
407 sa->sa_cookie = cookie;
409 if (rqset == PTLRPCD_SET)
410 ptlrpcd_add_req(req, PSCOPE_OTHER);
412 ptlrpc_set_add_req(rqset, req);
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419 struct obd_trans_info *oti,
420 struct ptlrpc_request_set *rqset)
422 return osc_setattr_async_base(exp, oinfo, oti,
423 oinfo->oi_cb_up, oinfo, rqset);
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427 struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 struct ptlrpc_request *req;
430 struct ost_body *body;
431 struct lov_stripe_md *lsm;
440 rc = obd_alloc_memmd(exp, &lsm);
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447 GOTO(out, rc = -ENOMEM);
449 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451 ptlrpc_request_free(req);
455 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457 lustre_set_wire_obdo(&body->oa, oa);
459 ptlrpc_request_set_replen(req);
461 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462 oa->o_flags == OBD_FL_DELORPHAN) {
464 "delorphan from OST integration");
465 /* Don't resend the delorphan req */
466 req->rq_no_resend = req->rq_no_delay = 1;
469 rc = ptlrpc_queue_wait(req);
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 GOTO(out_req, rc = -EPROTO);
477 lustre_get_wire_obdo(oa, &body->oa);
479 /* This should really be sent by the OST */
480 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481 oa->o_valid |= OBD_MD_FLBLKSZ;
483 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484 * have valid lsm_oinfo data structs, so don't go touching that.
485 * This needs to be fixed in a big way.
487 lsm->lsm_object_id = oa->o_id;
488 lsm->lsm_object_seq = oa->o_seq;
492 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495 if (!oti->oti_logcookies)
496 oti_alloc_cookies(oti, 1);
497 *oti->oti_logcookies = oa->o_lcookie;
501 CDEBUG(D_HA, "transno: "LPD64"\n",
502 lustre_msg_get_transno(req->rq_repmsg));
504 ptlrpc_req_finished(req);
507 obd_free_memmd(exp, &lsm);
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512 obd_enqueue_update_f upcall, void *cookie,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_setattr_args *sa;
517 struct ost_body *body;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 ptlrpc_request_free(req);
531 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532 ptlrpc_at_set_req_timeout(req);
534 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537 osc_pack_capa(req, body, oinfo->oi_capa);
539 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PSCOPE_OTHER);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557 struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync_interpret(const struct lu_env *env,
568 struct ptlrpc_request *req,
571 struct osc_async_args *aa = arg;
572 struct ost_body *body;
578 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580 CERROR ("can't unpack ost_body\n");
581 GOTO(out, rc = -EPROTO);
584 *aa->aa_oi->oi_oa = body->oa;
586 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
590 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
591 obd_size start, obd_size end,
592 struct ptlrpc_request_set *set)
594 struct ptlrpc_request *req;
595 struct ost_body *body;
596 struct osc_async_args *aa;
601 CDEBUG(D_INFO, "oa NULL\n");
605 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
609 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
610 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
612 ptlrpc_request_free(req);
616 /* overload the size and blocks fields in the oa with start/end */
617 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
619 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
620 body->oa.o_size = start;
621 body->oa.o_blocks = end;
622 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
623 osc_pack_capa(req, body, oinfo->oi_capa);
625 ptlrpc_request_set_replen(req);
626 req->rq_interpret_reply = osc_sync_interpret;
628 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
629 aa = ptlrpc_req_async_args(req);
632 ptlrpc_set_add_req(set, req);
636 /* Find and cancel locally locks matched by @mode in the resource found by
637 * @objid. Found locks are added into @cancel list. Returns the amount of
638 * locks added to @cancels list. */
639 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
641 ldlm_mode_t mode, int lock_flags)
643 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
644 struct ldlm_res_id res_id;
645 struct ldlm_resource *res;
649 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
650 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
654 LDLM_RESOURCE_ADDREF(res);
655 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
656 lock_flags, 0, NULL);
657 LDLM_RESOURCE_DELREF(res);
658 ldlm_resource_putref(res);
662 static int osc_destroy_interpret(const struct lu_env *env,
663 struct ptlrpc_request *req, void *data,
666 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668 cfs_atomic_dec(&cli->cl_destroy_in_flight);
669 cfs_waitq_signal(&cli->cl_destroy_waitq);
673 static int osc_can_send_destroy(struct client_obd *cli)
675 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
676 cli->cl_max_rpcs_in_flight) {
677 /* The destroy request can be sent */
680 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
681 cli->cl_max_rpcs_in_flight) {
683 * The counter has been modified between the two atomic
686 cfs_waitq_signal(&cli->cl_destroy_waitq);
691 /* Destroy requests can be async always on the client, and we don't even really
692 * care about the return code since the client cannot do anything at all about
694 * When the MDS is unlinking a filename, it saves the file objects into a
695 * recovery llog, and these object records are cancelled when the OST reports
696 * they were destroyed and sync'd to disk (i.e. transaction committed).
697 * If the client dies, or the OST is down when the object should be destroyed,
698 * the records are not cancelled, and when the OST reconnects to the MDS next,
699 * it will retrieve the llog unlink logs and then sends the log cancellation
700 * cookies to the MDS after committing destroy transactions. */
701 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
702 struct lov_stripe_md *ea, struct obd_trans_info *oti,
703 struct obd_export *md_export, void *capa)
705 struct client_obd *cli = &exp->exp_obd->u.cli;
706 struct ptlrpc_request *req;
707 struct ost_body *body;
708 CFS_LIST_HEAD(cancels);
713 CDEBUG(D_INFO, "oa NULL\n");
717 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
718 LDLM_FL_DISCARD_DATA);
720 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
722 ldlm_lock_list_put(&cancels, l_bl_ast, count);
726 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
727 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
730 ptlrpc_request_free(req);
734 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
735 ptlrpc_at_set_req_timeout(req);
737 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
738 oa->o_lcookie = *oti->oti_logcookies;
739 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
741 lustre_set_wire_obdo(&body->oa, oa);
743 osc_pack_capa(req, body, (struct obd_capa *)capa);
744 ptlrpc_request_set_replen(req);
746 /* don't throttle destroy RPCs for the MDT */
747 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
748 req->rq_interpret_reply = osc_destroy_interpret;
749 if (!osc_can_send_destroy(cli)) {
750 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
754 * Wait until the number of on-going destroy RPCs drops
755 * under max_rpc_in_flight
757 l_wait_event_exclusive(cli->cl_destroy_waitq,
758 osc_can_send_destroy(cli), &lwi);
762 /* Do not wait for response */
763 ptlrpcd_add_req(req, PSCOPE_OTHER);
767 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
770 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
772 LASSERT(!(oa->o_valid & bits));
775 client_obd_list_lock(&cli->cl_loi_list_lock);
776 oa->o_dirty = cli->cl_dirty;
777 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
778 CERROR("dirty %lu - %lu > dirty_max %lu\n",
779 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
781 } else if (cfs_atomic_read(&obd_dirty_pages) -
782 cfs_atomic_read(&obd_dirty_transit_pages) >
783 obd_max_dirty_pages + 1){
784 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
785 * not covered by a lock thus they may safely race and trip
786 * this CERROR() unless we add in a small fudge factor (+1). */
787 CERROR("dirty %d - %d > system dirty_max %d\n",
788 cfs_atomic_read(&obd_dirty_pages),
789 cfs_atomic_read(&obd_dirty_transit_pages),
790 obd_max_dirty_pages);
792 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
793 CERROR("dirty %lu - dirty_max %lu too big???\n",
794 cli->cl_dirty, cli->cl_dirty_max);
797 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
798 (cli->cl_max_rpcs_in_flight + 1);
799 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
801 oa->o_grant = cli->cl_avail_grant;
802 oa->o_dropped = cli->cl_lost_grant;
803 cli->cl_lost_grant = 0;
804 client_obd_list_unlock(&cli->cl_loi_list_lock);
805 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
806 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
810 static void osc_update_next_shrink(struct client_obd *cli)
812 cli->cl_next_shrink_grant =
813 cfs_time_shift(cli->cl_grant_shrink_interval);
814 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
815 cli->cl_next_shrink_grant);
818 /* caller must hold loi_list_lock */
819 static void osc_consume_write_grant(struct client_obd *cli,
820 struct brw_page *pga)
822 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
823 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
824 cfs_atomic_inc(&obd_dirty_pages);
825 cli->cl_dirty += CFS_PAGE_SIZE;
826 cli->cl_avail_grant -= CFS_PAGE_SIZE;
827 pga->flag |= OBD_BRW_FROM_GRANT;
828 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
829 CFS_PAGE_SIZE, pga, pga->pg);
830 LASSERT(cli->cl_avail_grant >= 0);
831 osc_update_next_shrink(cli);
834 /* the companion to osc_consume_write_grant, called when a brw has completed.
835 * must be called with the loi lock held. */
836 static void osc_release_write_grant(struct client_obd *cli,
837 struct brw_page *pga, int sent)
839 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
842 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
843 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
848 pga->flag &= ~OBD_BRW_FROM_GRANT;
849 cfs_atomic_dec(&obd_dirty_pages);
850 cli->cl_dirty -= CFS_PAGE_SIZE;
851 if (pga->flag & OBD_BRW_NOCACHE) {
852 pga->flag &= ~OBD_BRW_NOCACHE;
853 cfs_atomic_dec(&obd_dirty_transit_pages);
854 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
857 cli->cl_lost_grant += CFS_PAGE_SIZE;
858 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
859 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
860 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
861 /* For short writes we shouldn't count parts of pages that
862 * span a whole block on the OST side, or our accounting goes
863 * wrong. Should match the code in filter_grant_check. */
864 int offset = pga->off & ~CFS_PAGE_MASK;
865 int count = pga->count + (offset & (blocksize - 1));
866 int end = (offset + pga->count) & (blocksize - 1);
868 count += blocksize - end;
870 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
871 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
872 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
873 cli->cl_avail_grant, cli->cl_dirty);
879 static unsigned long rpcs_in_flight(struct client_obd *cli)
881 return cli->cl_r_in_flight + cli->cl_w_in_flight;
884 /* caller must hold loi_list_lock */
885 void osc_wake_cache_waiters(struct client_obd *cli)
888 struct osc_cache_waiter *ocw;
891 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
892 /* if we can't dirty more, we must wait until some is written */
893 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
894 (cfs_atomic_read(&obd_dirty_pages) + 1 >
895 obd_max_dirty_pages)) {
896 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
897 "osc max %ld, sys max %d\n", cli->cl_dirty,
898 cli->cl_dirty_max, obd_max_dirty_pages);
902 /* if still dirty cache but no grant wait for pending RPCs that
903 * may yet return us some grant before doing sync writes */
904 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
905 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
906 cli->cl_w_in_flight);
910 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
911 cfs_list_del_init(&ocw->ocw_entry);
912 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
913 /* no more RPCs in flight to return grant, do sync IO */
914 ocw->ocw_rc = -EDQUOT;
915 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
917 osc_consume_write_grant(cli,
918 &ocw->ocw_oap->oap_brw_page);
921 cfs_waitq_signal(&ocw->ocw_waitq);
927 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
929 client_obd_list_lock(&cli->cl_loi_list_lock);
930 cli->cl_avail_grant += grant;
931 client_obd_list_unlock(&cli->cl_loi_list_lock);
934 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
936 if (body->oa.o_valid & OBD_MD_FLGRANT) {
937 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
938 __osc_update_grant(cli, body->oa.o_grant);
942 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
943 void *key, obd_count vallen, void *val,
944 struct ptlrpc_request_set *set);
946 static int osc_shrink_grant_interpret(const struct lu_env *env,
947 struct ptlrpc_request *req,
950 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
951 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
952 struct ost_body *body;
955 __osc_update_grant(cli, oa->o_grant);
959 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
961 osc_update_grant(cli, body);
967 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
969 client_obd_list_lock(&cli->cl_loi_list_lock);
970 oa->o_grant = cli->cl_avail_grant / 4;
971 cli->cl_avail_grant -= oa->o_grant;
972 client_obd_list_unlock(&cli->cl_loi_list_lock);
973 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
974 oa->o_valid |= OBD_MD_FLFLAGS;
977 oa->o_flags |= OBD_FL_SHRINK_GRANT;
978 osc_update_next_shrink(cli);
981 /* Shrink the current grant, either from some large amount to enough for a
982 * full set of in-flight RPCs, or if we have already shrunk to that limit
983 * then to enough for a single RPC. This avoids keeping more grant than
984 * needed, and avoids shrinking the grant piecemeal. */
985 static int osc_shrink_grant(struct client_obd *cli)
987 long target = (cli->cl_max_rpcs_in_flight + 1) *
988 cli->cl_max_pages_per_rpc;
990 client_obd_list_lock(&cli->cl_loi_list_lock);
991 if (cli->cl_avail_grant <= target)
992 target = cli->cl_max_pages_per_rpc;
993 client_obd_list_unlock(&cli->cl_loi_list_lock);
995 return osc_shrink_grant_to_target(cli, target);
998 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1001 struct ost_body *body;
1004 client_obd_list_lock(&cli->cl_loi_list_lock);
1005 /* Don't shrink if we are already above or below the desired limit
1006 * We don't want to shrink below a single RPC, as that will negatively
1007 * impact block allocation and long-term performance. */
1008 if (target < cli->cl_max_pages_per_rpc)
1009 target = cli->cl_max_pages_per_rpc;
1011 if (target >= cli->cl_avail_grant) {
1012 client_obd_list_unlock(&cli->cl_loi_list_lock);
1015 client_obd_list_unlock(&cli->cl_loi_list_lock);
1017 OBD_ALLOC_PTR(body);
1021 osc_announce_cached(cli, &body->oa, 0);
1023 client_obd_list_lock(&cli->cl_loi_list_lock);
1024 body->oa.o_grant = cli->cl_avail_grant - target;
1025 cli->cl_avail_grant = target;
1026 client_obd_list_unlock(&cli->cl_loi_list_lock);
1027 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1028 body->oa.o_valid |= OBD_MD_FLFLAGS;
1029 body->oa.o_flags = 0;
1031 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1032 osc_update_next_shrink(cli);
1034 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1035 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1036 sizeof(*body), body, NULL);
1038 __osc_update_grant(cli, body->oa.o_grant);
1043 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1044 static int osc_should_shrink_grant(struct client_obd *client)
1046 cfs_time_t time = cfs_time_current();
1047 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1049 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1050 OBD_CONNECT_GRANT_SHRINK) == 0)
1053 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1054 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1055 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1058 osc_update_next_shrink(client);
1063 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1065 struct client_obd *client;
1067 cfs_list_for_each_entry(client, &item->ti_obd_list,
1068 cl_grant_shrink_list) {
1069 if (osc_should_shrink_grant(client))
1070 osc_shrink_grant(client);
1075 static int osc_add_shrink_grant(struct client_obd *client)
1079 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1081 osc_grant_shrink_grant_cb, NULL,
1082 &client->cl_grant_shrink_list);
1084 CERROR("add grant client %s error %d\n",
1085 client->cl_import->imp_obd->obd_name, rc);
1088 CDEBUG(D_CACHE, "add grant client %s \n",
1089 client->cl_import->imp_obd->obd_name);
1090 osc_update_next_shrink(client);
1094 static int osc_del_shrink_grant(struct client_obd *client)
1096 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1100 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1103 * ocd_grant is the total grant amount we're expect to hold: if we've
1104 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1105 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1107 * race is tolerable here: if we're evicted, but imp_state already
1108 * left EVICTED state, then cl_dirty must be 0 already.
1110 client_obd_list_lock(&cli->cl_loi_list_lock);
1111 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1112 cli->cl_avail_grant = ocd->ocd_grant;
1114 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1116 if (cli->cl_avail_grant < 0) {
1117 CWARN("%s: available grant < 0, the OSS is probably not running"
1118 " with patch from bug20278 (%ld) \n",
1119 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1120 /* workaround for 1.6 servers which do not have
1121 * the patch from bug20278 */
1122 cli->cl_avail_grant = ocd->ocd_grant;
1125 client_obd_list_unlock(&cli->cl_loi_list_lock);
1127 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1128 cli->cl_import->imp_obd->obd_name,
1129 cli->cl_avail_grant, cli->cl_lost_grant);
1131 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1132 cfs_list_empty(&cli->cl_grant_shrink_list))
1133 osc_add_shrink_grant(cli);
1136 /* We assume that the reason this OSC got a short read is because it read
1137 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1138 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1139 * this stripe never got written at or beyond this stripe offset yet. */
1140 static void handle_short_read(int nob_read, obd_count page_count,
1141 struct brw_page **pga)
1146 /* skip bytes read OK */
1147 while (nob_read > 0) {
1148 LASSERT (page_count > 0);
1150 if (pga[i]->count > nob_read) {
1151 /* EOF inside this page */
1152 ptr = cfs_kmap(pga[i]->pg) +
1153 (pga[i]->off & ~CFS_PAGE_MASK);
1154 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1155 cfs_kunmap(pga[i]->pg);
1161 nob_read -= pga[i]->count;
1166 /* zero remaining pages */
1167 while (page_count-- > 0) {
1168 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1169 memset(ptr, 0, pga[i]->count);
1170 cfs_kunmap(pga[i]->pg);
1175 static int check_write_rcs(struct ptlrpc_request *req,
1176 int requested_nob, int niocount,
1177 obd_count page_count, struct brw_page **pga)
1182 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1183 sizeof(*remote_rcs) *
1185 if (remote_rcs == NULL) {
1186 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1190 /* return error if any niobuf was in error */
1191 for (i = 0; i < niocount; i++) {
1192 if (remote_rcs[i] < 0)
1193 return(remote_rcs[i]);
1195 if (remote_rcs[i] != 0) {
1196 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1197 i, remote_rcs[i], req);
1202 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1203 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1204 req->rq_bulk->bd_nob_transferred, requested_nob);
1211 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1213 if (p1->flag != p2->flag) {
1214 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1215 OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1217 /* warn if we try to combine flags that we don't know to be
1218 * safe to combine */
1219 if ((p1->flag & mask) != (p2->flag & mask))
1220 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1221 "same brw?\n", p1->flag, p2->flag);
1225 return (p1->off + p1->count == p2->off);
1228 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1229 struct brw_page **pga, int opc,
1230 cksum_type_t cksum_type)
1235 LASSERT (pg_count > 0);
1236 cksum = init_checksum(cksum_type);
1237 while (nob > 0 && pg_count > 0) {
1238 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1239 int off = pga[i]->off & ~CFS_PAGE_MASK;
1240 int count = pga[i]->count > nob ? nob : pga[i]->count;
1242 /* corrupt the data before we compute the checksum, to
1243 * simulate an OST->client data error */
1244 if (i == 0 && opc == OST_READ &&
1245 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1246 memcpy(ptr + off, "bad1", min(4, nob));
1247 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1248 cfs_kunmap(pga[i]->pg);
1249 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1252 nob -= pga[i]->count;
1256 /* For sending we only compute the wrong checksum instead
1257 * of corrupting the data so it is still correct on a redo */
1258 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1264 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1265 struct lov_stripe_md *lsm, obd_count page_count,
1266 struct brw_page **pga,
1267 struct ptlrpc_request **reqp,
1268 struct obd_capa *ocapa, int reserve,
1271 struct ptlrpc_request *req;
1272 struct ptlrpc_bulk_desc *desc;
1273 struct ost_body *body;
1274 struct obd_ioobj *ioobj;
1275 struct niobuf_remote *niobuf;
1276 int niocount, i, requested_nob, opc, rc;
1277 struct osc_brw_async_args *aa;
1278 struct req_capsule *pill;
1279 struct brw_page *pg_prev;
1282 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1283 RETURN(-ENOMEM); /* Recoverable */
1284 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1285 RETURN(-EINVAL); /* Fatal */
1287 if ((cmd & OBD_BRW_WRITE) != 0) {
1289 req = ptlrpc_request_alloc_pool(cli->cl_import,
1290 cli->cl_import->imp_rq_pool,
1291 &RQF_OST_BRW_WRITE);
1294 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1299 for (niocount = i = 1; i < page_count; i++) {
1300 if (!can_merge_pages(pga[i - 1], pga[i]))
1304 pill = &req->rq_pill;
1305 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1307 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1308 niocount * sizeof(*niobuf));
1309 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1311 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1313 ptlrpc_request_free(req);
1316 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1317 ptlrpc_at_set_req_timeout(req);
1319 if (opc == OST_WRITE)
1320 desc = ptlrpc_prep_bulk_imp(req, page_count,
1321 BULK_GET_SOURCE, OST_BULK_PORTAL);
1323 desc = ptlrpc_prep_bulk_imp(req, page_count,
1324 BULK_PUT_SINK, OST_BULK_PORTAL);
1327 GOTO(out, rc = -ENOMEM);
1328 /* NB request now owns desc and will free it when it gets freed */
1330 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1331 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1332 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1333 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1335 lustre_set_wire_obdo(&body->oa, oa);
1337 obdo_to_ioobj(oa, ioobj);
1338 ioobj->ioo_bufcnt = niocount;
1339 osc_pack_capa(req, body, ocapa);
1340 LASSERT (page_count > 0);
1342 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1343 struct brw_page *pg = pga[i];
1345 LASSERT(pg->count > 0);
1346 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1347 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1348 pg->off, pg->count);
1350 LASSERTF(i == 0 || pg->off > pg_prev->off,
1351 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1352 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1354 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1355 pg_prev->pg, page_private(pg_prev->pg),
1356 pg_prev->pg->index, pg_prev->off);
1358 LASSERTF(i == 0 || pg->off > pg_prev->off,
1359 "i %d p_c %u\n", i, page_count);
1361 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1362 (pg->flag & OBD_BRW_SRVLOCK));
1364 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1366 requested_nob += pg->count;
1368 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1370 niobuf->len += pg->count;
1372 niobuf->offset = pg->off;
1373 niobuf->len = pg->count;
1374 niobuf->flags = pg->flag;
1379 LASSERTF((void *)(niobuf - niocount) ==
1380 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1381 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1382 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1384 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1386 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1387 body->oa.o_valid |= OBD_MD_FLFLAGS;
1388 body->oa.o_flags = 0;
1390 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1393 if (osc_should_shrink_grant(cli))
1394 osc_shrink_grant_local(cli, &body->oa);
1396 /* size[REQ_REC_OFF] still sizeof (*body) */
1397 if (opc == OST_WRITE) {
1398 if (unlikely(cli->cl_checksum) &&
1399 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1400 /* store cl_cksum_type in a local variable since
1401 * it can be changed via lprocfs */
1402 cksum_type_t cksum_type = cli->cl_cksum_type;
1404 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1405 oa->o_flags &= OBD_FL_LOCAL_MASK;
1406 body->oa.o_flags = 0;
1408 body->oa.o_flags |= cksum_type_pack(cksum_type);
1409 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1410 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1414 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1416 /* save this in 'oa', too, for later checking */
1417 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1418 oa->o_flags |= cksum_type_pack(cksum_type);
1420 /* clear out the checksum flag, in case this is a
1421 * resend but cl_checksum is no longer set. b=11238 */
1422 oa->o_valid &= ~OBD_MD_FLCKSUM;
1424 oa->o_cksum = body->oa.o_cksum;
1425 /* 1 RC per niobuf */
1426 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1427 sizeof(__u32) * niocount);
1429 if (unlikely(cli->cl_checksum) &&
1430 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1431 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1432 body->oa.o_flags = 0;
1433 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1434 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1437 ptlrpc_request_set_replen(req);
1439 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1440 aa = ptlrpc_req_async_args(req);
1442 aa->aa_requested_nob = requested_nob;
1443 aa->aa_nio_count = niocount;
1444 aa->aa_page_count = page_count;
1448 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1449 if (ocapa && reserve)
1450 aa->aa_ocapa = capa_get(ocapa);
1456 ptlrpc_req_finished(req);
1460 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1461 __u32 client_cksum, __u32 server_cksum, int nob,
1462 obd_count page_count, struct brw_page **pga,
1463 cksum_type_t client_cksum_type)
1467 cksum_type_t cksum_type;
1469 if (server_cksum == client_cksum) {
1470 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1474 /* If this is mmaped file - it can be changed at any time */
1475 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1478 if (oa->o_valid & OBD_MD_FLFLAGS)
1479 cksum_type = cksum_type_unpack(oa->o_flags);
1481 cksum_type = OBD_CKSUM_CRC32;
1483 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1486 if (cksum_type != client_cksum_type)
1487 msg = "the server did not use the checksum type specified in "
1488 "the original request - likely a protocol problem";
1489 else if (new_cksum == server_cksum)
1490 msg = "changed on the client after we checksummed it - "
1491 "likely false positive due to mmap IO (bug 11742)";
1492 else if (new_cksum == client_cksum)
1493 msg = "changed in transit before arrival at OST";
1495 msg = "changed in transit AND doesn't match the original - "
1496 "likely false positive due to mmap IO (bug 11742)";
1498 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1499 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1500 msg, libcfs_nid2str(peer->nid),
1501 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1502 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1503 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1505 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1507 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1508 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1509 "client csum now %x\n", client_cksum, client_cksum_type,
1510 server_cksum, cksum_type, new_cksum);
1514 /* Note rc enters this function as number of bytes transferred */
1515 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1517 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1518 const lnet_process_id_t *peer =
1519 &req->rq_import->imp_connection->c_peer;
1520 struct client_obd *cli = aa->aa_cli;
1521 struct ost_body *body;
1522 __u32 client_cksum = 0;
1525 if (rc < 0 && rc != -EDQUOT) {
1526 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1530 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1531 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1533 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1537 #ifdef HAVE_QUOTA_SUPPORT
1538 /* set/clear over quota flag for a uid/gid */
1539 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1540 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1541 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1543 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1544 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1546 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1551 osc_update_grant(cli, body);
1556 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1557 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1559 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1561 CERROR("Unexpected +ve rc %d\n", rc);
1564 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1566 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1569 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1570 check_write_checksum(&body->oa, peer, client_cksum,
1571 body->oa.o_cksum, aa->aa_requested_nob,
1572 aa->aa_page_count, aa->aa_ppga,
1573 cksum_type_unpack(aa->aa_oa->o_flags)))
1576 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1577 aa->aa_page_count, aa->aa_ppga);
1581 /* The rest of this function executes only for OST_READs */
1583 /* if unwrap_bulk failed, return -EAGAIN to retry */
1584 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1586 GOTO(out, rc = -EAGAIN);
1588 if (rc > aa->aa_requested_nob) {
1589 CERROR("Unexpected rc %d (%d requested)\n", rc,
1590 aa->aa_requested_nob);
1594 if (rc != req->rq_bulk->bd_nob_transferred) {
1595 CERROR ("Unexpected rc %d (%d transferred)\n",
1596 rc, req->rq_bulk->bd_nob_transferred);
1600 if (rc < aa->aa_requested_nob)
1601 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1603 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1604 static int cksum_counter;
1605 __u32 server_cksum = body->oa.o_cksum;
1608 cksum_type_t cksum_type;
1610 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1611 cksum_type = cksum_type_unpack(body->oa.o_flags);
1613 cksum_type = OBD_CKSUM_CRC32;
1614 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1615 aa->aa_ppga, OST_READ,
1618 if (peer->nid == req->rq_bulk->bd_sender) {
1622 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1625 if (server_cksum == ~0 && rc > 0) {
1626 CERROR("Protocol error: server %s set the 'checksum' "
1627 "bit, but didn't send a checksum. Not fatal, "
1628 "but please notify on http://bugzilla.lustre.org/\n",
1629 libcfs_nid2str(peer->nid));
1630 } else if (server_cksum != client_cksum) {
1631 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1632 "%s%s%s inode "DFID" object "
1633 LPU64"/"LPU64" extent "
1634 "["LPU64"-"LPU64"]\n",
1635 req->rq_import->imp_obd->obd_name,
1636 libcfs_nid2str(peer->nid),
1638 body->oa.o_valid & OBD_MD_FLFID ?
1639 body->oa.o_parent_seq : (__u64)0,
1640 body->oa.o_valid & OBD_MD_FLFID ?
1641 body->oa.o_parent_oid : 0,
1642 body->oa.o_valid & OBD_MD_FLFID ?
1643 body->oa.o_parent_ver : 0,
1645 body->oa.o_valid & OBD_MD_FLGROUP ?
1646 body->oa.o_seq : (__u64)0,
1647 aa->aa_ppga[0]->off,
1648 aa->aa_ppga[aa->aa_page_count-1]->off +
1649 aa->aa_ppga[aa->aa_page_count-1]->count -
1651 CERROR("client %x, server %x, cksum_type %x\n",
1652 client_cksum, server_cksum, cksum_type);
1654 aa->aa_oa->o_cksum = client_cksum;
1658 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1661 } else if (unlikely(client_cksum)) {
1662 static int cksum_missed;
1665 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1666 CERROR("Checksum %u requested from %s but not sent\n",
1667 cksum_missed, libcfs_nid2str(peer->nid));
1673 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1678 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1679 struct lov_stripe_md *lsm,
1680 obd_count page_count, struct brw_page **pga,
1681 struct obd_capa *ocapa)
1683 struct ptlrpc_request *req;
1687 struct l_wait_info lwi;
1691 cfs_waitq_init(&waitq);
1694 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1695 page_count, pga, &req, ocapa, 0, resends);
1699 rc = ptlrpc_queue_wait(req);
1701 if (rc == -ETIMEDOUT && req->rq_resend) {
1702 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1703 ptlrpc_req_finished(req);
1707 rc = osc_brw_fini_request(req, rc);
1709 ptlrpc_req_finished(req);
1710 if (osc_recoverable_error(rc)) {
1712 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1713 CERROR("too many resend retries, returning error\n");
1717 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1718 l_wait_event(waitq, 0, &lwi);
1726 int osc_brw_redo_request(struct ptlrpc_request *request,
1727 struct osc_brw_async_args *aa)
1729 struct ptlrpc_request *new_req;
1730 struct ptlrpc_request_set *set = request->rq_set;
1731 struct osc_brw_async_args *new_aa;
1732 struct osc_async_page *oap;
1736 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1737 CERROR("too many resent retries, returning error\n");
1741 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1743 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1744 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1745 aa->aa_cli, aa->aa_oa,
1746 NULL /* lsm unused by osc currently */,
1747 aa->aa_page_count, aa->aa_ppga,
1748 &new_req, aa->aa_ocapa, 0, 1);
1752 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1754 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1755 if (oap->oap_request != NULL) {
1756 LASSERTF(request == oap->oap_request,
1757 "request %p != oap_request %p\n",
1758 request, oap->oap_request);
1759 if (oap->oap_interrupted) {
1760 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1761 ptlrpc_req_finished(new_req);
1766 /* New request takes over pga and oaps from old request.
1767 * Note that copying a list_head doesn't work, need to move it... */
1769 new_req->rq_interpret_reply = request->rq_interpret_reply;
1770 new_req->rq_async_args = request->rq_async_args;
1771 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1773 new_aa = ptlrpc_req_async_args(new_req);
1775 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1776 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1777 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1779 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1780 if (oap->oap_request) {
1781 ptlrpc_req_finished(oap->oap_request);
1782 oap->oap_request = ptlrpc_request_addref(new_req);
1786 new_aa->aa_ocapa = aa->aa_ocapa;
1787 aa->aa_ocapa = NULL;
1789 /* use ptlrpc_set_add_req is safe because interpret functions work
1790 * in check_set context. only one way exist with access to request
1791 * from different thread got -EINTR - this way protected with
1792 * cl_loi_list_lock */
1793 ptlrpc_set_add_req(set, new_req);
1795 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1797 DEBUG_REQ(D_INFO, new_req, "new request");
1802 * ugh, we want disk allocation on the target to happen in offset order. we'll
1803 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1804 * fine for our small page arrays and doesn't require allocation. its an
1805 * insertion sort that swaps elements that are strides apart, shrinking the
1806 * stride down until its '1' and the array is sorted.
1808 static void sort_brw_pages(struct brw_page **array, int num)
1811 struct brw_page *tmp;
1815 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1820 for (i = stride ; i < num ; i++) {
1823 while (j >= stride && array[j - stride]->off > tmp->off) {
1824 array[j] = array[j - stride];
1829 } while (stride > 1);
1832 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1838 LASSERT (pages > 0);
1839 offset = pg[i]->off & ~CFS_PAGE_MASK;
1843 if (pages == 0) /* that's all */
1846 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1847 return count; /* doesn't end on page boundary */
1850 offset = pg[i]->off & ~CFS_PAGE_MASK;
1851 if (offset != 0) /* doesn't start on page boundary */
1858 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1860 struct brw_page **ppga;
1863 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1867 for (i = 0; i < count; i++)
1872 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1874 LASSERT(ppga != NULL);
1875 OBD_FREE(ppga, sizeof(*ppga) * count);
1878 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1879 obd_count page_count, struct brw_page *pga,
1880 struct obd_trans_info *oti)
1882 struct obdo *saved_oa = NULL;
1883 struct brw_page **ppga, **orig;
1884 struct obd_import *imp = class_exp2cliimp(exp);
1885 struct client_obd *cli;
1886 int rc, page_count_orig;
1889 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1890 cli = &imp->imp_obd->u.cli;
1892 if (cmd & OBD_BRW_CHECK) {
1893 /* The caller just wants to know if there's a chance that this
1894 * I/O can succeed */
1896 if (imp->imp_invalid)
1901 /* test_brw with a failed create can trip this, maybe others. */
1902 LASSERT(cli->cl_max_pages_per_rpc);
1906 orig = ppga = osc_build_ppga(pga, page_count);
1909 page_count_orig = page_count;
1911 sort_brw_pages(ppga, page_count);
1912 while (page_count) {
1913 obd_count pages_per_brw;
1915 if (page_count > cli->cl_max_pages_per_rpc)
1916 pages_per_brw = cli->cl_max_pages_per_rpc;
1918 pages_per_brw = page_count;
1920 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1922 if (saved_oa != NULL) {
1923 /* restore previously saved oa */
1924 *oinfo->oi_oa = *saved_oa;
1925 } else if (page_count > pages_per_brw) {
1926 /* save a copy of oa (brw will clobber it) */
1927 OBDO_ALLOC(saved_oa);
1928 if (saved_oa == NULL)
1929 GOTO(out, rc = -ENOMEM);
1930 *saved_oa = *oinfo->oi_oa;
1933 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1934 pages_per_brw, ppga, oinfo->oi_capa);
1939 page_count -= pages_per_brw;
1940 ppga += pages_per_brw;
1944 osc_release_ppga(orig, page_count_orig);
1946 if (saved_oa != NULL)
1947 OBDO_FREE(saved_oa);
1952 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1953 * the dirty accounting. Writeback completes or truncate happens before
1954 * writing starts. Must be called with the loi lock held. */
1955 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1958 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1962 /* This maintains the lists of pending pages to read/write for a given object
1963 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964 * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1971 if (lop->lop_num_pending == 0)
1974 /* if we have an invalid import we want to drain the queued pages
1975 * by forcing them through rpcs that immediately fail and complete
1976 * the pages. recovery relies on this to empty the queued pages
1977 * before canceling the locks and evicting down the llite pages */
1978 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1981 /* stream rpcs in queue order as long as as there is an urgent page
1982 * queued. this is our cheap solution for good batching in the case
1983 * where writepage marks some random page in the middle of the file
1984 * as urgent because of, say, memory pressure */
1985 if (!cfs_list_empty(&lop->lop_urgent)) {
1986 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1989 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1990 optimal = cli->cl_max_pages_per_rpc;
1991 if (cmd & OBD_BRW_WRITE) {
1992 /* trigger a write rpc stream as long as there are dirtiers
1993 * waiting for space. as they're waiting, they're not going to
1994 * create more pages to coalesce with what's waiting.. */
1995 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1999 /* +16 to avoid triggering rpcs that would want to include pages
2000 * that are being queued but which can't be made ready until
2001 * the queuer finishes with the page. this is a wart for
2002 * llite::commit_write() */
2005 if (lop->lop_num_pending >= optimal)
2011 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2013 struct osc_async_page *oap;
2016 if (cfs_list_empty(&lop->lop_urgent))
2019 oap = cfs_list_entry(lop->lop_urgent.next,
2020 struct osc_async_page, oap_urgent_item);
2022 if (oap->oap_async_flags & ASYNC_HP) {
2023 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2030 static void on_list(cfs_list_t *item, cfs_list_t *list,
2033 if (cfs_list_empty(item) && should_be_on)
2034 cfs_list_add_tail(item, list);
2035 else if (!cfs_list_empty(item) && !should_be_on)
2036 cfs_list_del_init(item);
2039 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2040 * can find pages to build into rpcs quickly */
2041 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2043 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2044 lop_makes_hprpc(&loi->loi_read_lop)) {
2046 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2047 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2049 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2050 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2051 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2052 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2055 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2056 loi->loi_write_lop.lop_num_pending);
2058 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2059 loi->loi_read_lop.lop_num_pending);
2062 static void lop_update_pending(struct client_obd *cli,
2063 struct loi_oap_pages *lop, int cmd, int delta)
2065 lop->lop_num_pending += delta;
2066 if (cmd & OBD_BRW_WRITE)
2067 cli->cl_pending_w_pages += delta;
2069 cli->cl_pending_r_pages += delta;
2073 * this is called when a sync waiter receives an interruption. Its job is to
2074 * get the caller woken as soon as possible. If its page hasn't been put in an
2075 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2076 * desiring interruption which will forcefully complete the rpc once the rpc
2079 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2081 struct loi_oap_pages *lop;
2082 struct lov_oinfo *loi;
2086 LASSERT(!oap->oap_interrupted);
2087 oap->oap_interrupted = 1;
2089 /* ok, it's been put in an rpc. only one oap gets a request reference */
2090 if (oap->oap_request != NULL) {
2091 ptlrpc_mark_interrupted(oap->oap_request);
2092 ptlrpcd_wake(oap->oap_request);
2093 ptlrpc_req_finished(oap->oap_request);
2094 oap->oap_request = NULL;
2098 * page completion may be called only if ->cpo_prep() method was
2099 * executed by osc_io_submit(), that also adds page the to pending list
2101 if (!cfs_list_empty(&oap->oap_pending_item)) {
2102 cfs_list_del_init(&oap->oap_pending_item);
2103 cfs_list_del_init(&oap->oap_urgent_item);
2106 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2107 &loi->loi_write_lop : &loi->loi_read_lop;
2108 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2109 loi_list_maint(oap->oap_cli, oap->oap_loi);
2110 rc = oap->oap_caller_ops->ap_completion(env,
2111 oap->oap_caller_data,
2112 oap->oap_cmd, NULL, -EINTR);
2118 /* this is trying to propogate async writeback errors back up to the
2119 * application. As an async write fails we record the error code for later if
2120 * the app does an fsync. As long as errors persist we force future rpcs to be
2121 * sync so that the app can get a sync error and break the cycle of queueing
2122 * pages for which writeback will fail. */
2123 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2130 ar->ar_force_sync = 1;
2131 ar->ar_min_xid = ptlrpc_sample_next_xid();
2136 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2137 ar->ar_force_sync = 0;
2140 void osc_oap_to_pending(struct osc_async_page *oap)
2142 struct loi_oap_pages *lop;
2144 if (oap->oap_cmd & OBD_BRW_WRITE)
2145 lop = &oap->oap_loi->loi_write_lop;
2147 lop = &oap->oap_loi->loi_read_lop;
2149 if (oap->oap_async_flags & ASYNC_HP)
2150 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2151 else if (oap->oap_async_flags & ASYNC_URGENT)
2152 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2153 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2154 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2157 /* this must be called holding the loi list lock to give coverage to exit_cache,
2158 * async_flag maintenance, and oap_request */
2159 static void osc_ap_completion(const struct lu_env *env,
2160 struct client_obd *cli, struct obdo *oa,
2161 struct osc_async_page *oap, int sent, int rc)
2166 if (oap->oap_request != NULL) {
2167 xid = ptlrpc_req_xid(oap->oap_request);
2168 ptlrpc_req_finished(oap->oap_request);
2169 oap->oap_request = NULL;
2172 cfs_spin_lock(&oap->oap_lock);
2173 oap->oap_async_flags = 0;
2174 cfs_spin_unlock(&oap->oap_lock);
2175 oap->oap_interrupted = 0;
2177 if (oap->oap_cmd & OBD_BRW_WRITE) {
2178 osc_process_ar(&cli->cl_ar, xid, rc);
2179 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2182 if (rc == 0 && oa != NULL) {
2183 if (oa->o_valid & OBD_MD_FLBLOCKS)
2184 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2185 if (oa->o_valid & OBD_MD_FLMTIME)
2186 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2187 if (oa->o_valid & OBD_MD_FLATIME)
2188 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2189 if (oa->o_valid & OBD_MD_FLCTIME)
2190 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2193 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2194 oap->oap_cmd, oa, rc);
2196 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2197 * I/O on the page could start, but OSC calls it under lock
2198 * and thus we can add oap back to pending safely */
2200 /* upper layer wants to leave the page on pending queue */
2201 osc_oap_to_pending(oap);
2203 osc_exit_cache(cli, oap, sent);
2207 static int brw_interpret(const struct lu_env *env,
2208 struct ptlrpc_request *req, void *data, int rc)
2210 struct osc_brw_async_args *aa = data;
2211 struct client_obd *cli;
2215 rc = osc_brw_fini_request(req, rc);
2216 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2217 if (osc_recoverable_error(rc)) {
2218 /* Only retry once for mmaped files since the mmaped page
2219 * might be modified at anytime. We have to retry at least
2220 * once in case there WAS really a corruption of the page
2221 * on the network, that was not caused by mmap() modifying
2222 * the page. Bug11742 */
2223 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2224 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2225 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2228 rc = osc_brw_redo_request(req, aa);
2235 capa_put(aa->aa_ocapa);
2236 aa->aa_ocapa = NULL;
2241 client_obd_list_lock(&cli->cl_loi_list_lock);
2243 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2244 * is called so we know whether to go to sync BRWs or wait for more
2245 * RPCs to complete */
2246 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2247 cli->cl_w_in_flight--;
2249 cli->cl_r_in_flight--;
2251 async = cfs_list_empty(&aa->aa_oaps);
2252 if (!async) { /* from osc_send_oap_rpc() */
2253 struct osc_async_page *oap, *tmp;
2254 /* the caller may re-use the oap after the completion call so
2255 * we need to clean it up a little */
2256 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2258 cfs_list_del_init(&oap->oap_rpc_item);
2259 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2261 OBDO_FREE(aa->aa_oa);
2262 } else { /* from async_internal() */
2264 for (i = 0; i < aa->aa_page_count; i++)
2265 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2267 osc_wake_cache_waiters(cli);
2268 osc_check_rpcs(env, cli);
2269 client_obd_list_unlock(&cli->cl_loi_list_lock);
2271 cl_req_completion(env, aa->aa_clerq, rc);
2272 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2277 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2278 struct client_obd *cli,
2279 cfs_list_t *rpc_list,
2280 int page_count, int cmd)
2282 struct ptlrpc_request *req;
2283 struct brw_page **pga = NULL;
2284 struct osc_brw_async_args *aa;
2285 struct obdo *oa = NULL;
2286 const struct obd_async_page_ops *ops = NULL;
2287 void *caller_data = NULL;
2288 struct osc_async_page *oap;
2289 struct osc_async_page *tmp;
2290 struct ost_body *body;
2291 struct cl_req *clerq = NULL;
2292 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2293 struct ldlm_lock *lock = NULL;
2294 struct cl_req_attr crattr;
2295 int i, rc, mpflag = 0;
2298 LASSERT(!cfs_list_empty(rpc_list));
2300 if (cmd & OBD_BRW_MEMALLOC)
2301 mpflag = cfs_memory_pressure_get_and_set();
2303 memset(&crattr, 0, sizeof crattr);
2304 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2306 GOTO(out, req = ERR_PTR(-ENOMEM));
2310 GOTO(out, req = ERR_PTR(-ENOMEM));
2313 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2314 struct cl_page *page = osc_oap2cl_page(oap);
2316 ops = oap->oap_caller_ops;
2317 caller_data = oap->oap_caller_data;
2319 clerq = cl_req_alloc(env, page, crt,
2320 1 /* only 1-object rpcs for
2323 GOTO(out, req = (void *)clerq);
2324 lock = oap->oap_ldlm_lock;
2326 pga[i] = &oap->oap_brw_page;
2327 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2328 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2329 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2331 cl_req_page_add(env, clerq, page);
2334 /* always get the data for the obdo for the rpc */
2335 LASSERT(ops != NULL);
2337 crattr.cra_capa = NULL;
2338 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2340 oa->o_handle = lock->l_remote_handle;
2341 oa->o_valid |= OBD_MD_FLHANDLE;
2344 rc = cl_req_prep(env, clerq);
2346 CERROR("cl_req_prep failed: %d\n", rc);
2347 GOTO(out, req = ERR_PTR(rc));
2350 sort_brw_pages(pga, page_count);
2351 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2352 pga, &req, crattr.cra_capa, 1, 0);
2354 CERROR("prep_req failed: %d\n", rc);
2355 GOTO(out, req = ERR_PTR(rc));
2358 if (cmd & OBD_BRW_MEMALLOC)
2359 req->rq_memalloc = 1;
2361 /* Need to update the timestamps after the request is built in case
2362 * we race with setattr (locally or in queue at OST). If OST gets
2363 * later setattr before earlier BRW (as determined by the request xid),
2364 * the OST will not use BRW timestamps. Sadly, there is no obvious
2365 * way to do this in a single call. bug 10150 */
2366 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2367 cl_req_attr_set(env, clerq, &crattr,
2368 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2370 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2371 aa = ptlrpc_req_async_args(req);
2372 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2373 cfs_list_splice(rpc_list, &aa->aa_oaps);
2374 CFS_INIT_LIST_HEAD(rpc_list);
2375 aa->aa_clerq = clerq;
2377 if (cmd & OBD_BRW_MEMALLOC)
2378 cfs_memory_pressure_restore(mpflag);
2380 capa_put(crattr.cra_capa);
2385 OBD_FREE(pga, sizeof(*pga) * page_count);
2386 /* this should happen rarely and is pretty bad, it makes the
2387 * pending list not follow the dirty order */
2388 client_obd_list_lock(&cli->cl_loi_list_lock);
2389 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2390 cfs_list_del_init(&oap->oap_rpc_item);
2392 /* queued sync pages can be torn down while the pages
2393 * were between the pending list and the rpc */
2394 if (oap->oap_interrupted) {
2395 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2396 osc_ap_completion(env, cli, NULL, oap, 0,
2400 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2402 if (clerq && !IS_ERR(clerq))
2403 cl_req_completion(env, clerq, PTR_ERR(req));
2409 * prepare pages for ASYNC io and put pages in send queue.
2411 * \param cmd OBD_BRW_* macroses
2412 * \param lop pending pages
2414 * \return zero if no page added to send queue.
2415 * \return 1 if pages successfully added to send queue.
2416 * \return negative on errors.
2419 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2420 struct lov_oinfo *loi,
2421 int cmd, struct loi_oap_pages *lop)
2423 struct ptlrpc_request *req;
2424 obd_count page_count = 0;
2425 struct osc_async_page *oap = NULL, *tmp;
2426 struct osc_brw_async_args *aa;
2427 const struct obd_async_page_ops *ops;
2428 CFS_LIST_HEAD(rpc_list);
2429 CFS_LIST_HEAD(tmp_list);
2430 unsigned int ending_offset;
2431 unsigned starting_offset = 0;
2432 int srvlock = 0, mem_tight = 0;
2433 struct cl_object *clob = NULL;
2436 /* ASYNC_HP pages first. At present, when the lock the pages is
2437 * to be canceled, the pages covered by the lock will be sent out
2438 * with ASYNC_HP. We have to send out them as soon as possible. */
2439 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2440 if (oap->oap_async_flags & ASYNC_HP)
2441 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2443 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2444 if (++page_count >= cli->cl_max_pages_per_rpc)
2448 cfs_list_splice(&tmp_list, &lop->lop_pending);
2451 /* first we find the pages we're allowed to work with */
2452 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2454 ops = oap->oap_caller_ops;
2456 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2457 "magic 0x%x\n", oap, oap->oap_magic);
2460 /* pin object in memory, so that completion call-backs
2461 * can be safely called under client_obd_list lock. */
2462 clob = osc_oap2cl_page(oap)->cp_obj;
2463 cl_object_get(clob);
2466 if (page_count != 0 &&
2467 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2468 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2469 " oap %p, page %p, srvlock %u\n",
2470 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2474 /* If there is a gap at the start of this page, it can't merge
2475 * with any previous page, so we'll hand the network a
2476 * "fragmented" page array that it can't transfer in 1 RDMA */
2477 if (page_count != 0 && oap->oap_page_off != 0)
2480 /* in llite being 'ready' equates to the page being locked
2481 * until completion unlocks it. commit_write submits a page
2482 * as not ready because its unlock will happen unconditionally
2483 * as the call returns. if we race with commit_write giving
2484 * us that page we don't want to create a hole in the page
2485 * stream, so we stop and leave the rpc to be fired by
2486 * another dirtier or kupdated interval (the not ready page
2487 * will still be on the dirty list). we could call in
2488 * at the end of ll_file_write to process the queue again. */
2489 if (!(oap->oap_async_flags & ASYNC_READY)) {
2490 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2493 CDEBUG(D_INODE, "oap %p page %p returned %d "
2494 "instead of ready\n", oap,
2498 /* llite is telling us that the page is still
2499 * in commit_write and that we should try
2500 * and put it in an rpc again later. we
2501 * break out of the loop so we don't create
2502 * a hole in the sequence of pages in the rpc
2507 /* the io isn't needed.. tell the checks
2508 * below to complete the rpc with EINTR */
2509 cfs_spin_lock(&oap->oap_lock);
2510 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2511 cfs_spin_unlock(&oap->oap_lock);
2512 oap->oap_count = -EINTR;
2515 cfs_spin_lock(&oap->oap_lock);
2516 oap->oap_async_flags |= ASYNC_READY;
2517 cfs_spin_unlock(&oap->oap_lock);
2520 LASSERTF(0, "oap %p page %p returned %d "
2521 "from make_ready\n", oap,
2529 * Page submitted for IO has to be locked. Either by
2530 * ->ap_make_ready() or by higher layers.
2532 #if defined(__KERNEL__) && defined(__linux__)
2534 struct cl_page *page;
2536 page = osc_oap2cl_page(oap);
2538 if (page->cp_type == CPT_CACHEABLE &&
2539 !(PageLocked(oap->oap_page) &&
2540 (CheckWriteback(oap->oap_page, cmd)))) {
2541 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2543 (long)oap->oap_page->flags,
2544 oap->oap_async_flags);
2550 /* take the page out of our book-keeping */
2551 cfs_list_del_init(&oap->oap_pending_item);
2552 lop_update_pending(cli, lop, cmd, -1);
2553 cfs_list_del_init(&oap->oap_urgent_item);
2555 if (page_count == 0)
2556 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2557 (PTLRPC_MAX_BRW_SIZE - 1);
2559 /* ask the caller for the size of the io as the rpc leaves. */
2560 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2562 ops->ap_refresh_count(env, oap->oap_caller_data,
2564 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2566 if (oap->oap_count <= 0) {
2567 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2569 osc_ap_completion(env, cli, NULL,
2570 oap, 0, oap->oap_count);
2574 /* now put the page back in our accounting */
2575 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2576 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2578 if (page_count == 0)
2579 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2580 if (++page_count >= cli->cl_max_pages_per_rpc)
2583 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2584 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2585 * have the same alignment as the initial writes that allocated
2586 * extents on the server. */
2587 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2588 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2589 if (ending_offset == 0)
2592 /* If there is a gap at the end of this page, it can't merge
2593 * with any subsequent pages, so we'll hand the network a
2594 * "fragmented" page array that it can't transfer in 1 RDMA */
2595 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2599 osc_wake_cache_waiters(cli);
2601 loi_list_maint(cli, loi);
2603 client_obd_list_unlock(&cli->cl_loi_list_lock);
2606 cl_object_put(env, clob);
2608 if (page_count == 0) {
2609 client_obd_list_lock(&cli->cl_loi_list_lock);
2613 req = osc_build_req(env, cli, &rpc_list, page_count,
2614 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2616 LASSERT(cfs_list_empty(&rpc_list));
2617 loi_list_maint(cli, loi);
2618 RETURN(PTR_ERR(req));
2621 aa = ptlrpc_req_async_args(req);
2623 if (cmd == OBD_BRW_READ) {
2624 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2625 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2626 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2627 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2629 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2630 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2631 cli->cl_w_in_flight);
2632 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2633 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2635 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2637 client_obd_list_lock(&cli->cl_loi_list_lock);
2639 if (cmd == OBD_BRW_READ)
2640 cli->cl_r_in_flight++;
2642 cli->cl_w_in_flight++;
2644 /* queued sync pages can be torn down while the pages
2645 * were between the pending list and the rpc */
2647 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2648 /* only one oap gets a request reference */
2651 if (oap->oap_interrupted && !req->rq_intr) {
2652 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2654 ptlrpc_mark_interrupted(req);
2658 tmp->oap_request = ptlrpc_request_addref(req);
2660 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2661 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2663 req->rq_interpret_reply = brw_interpret;
2664 ptlrpcd_add_req(req, PSCOPE_BRW);
2668 #define LOI_DEBUG(LOI, STR, args...) \
2669 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2670 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2671 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2672 (LOI)->loi_write_lop.lop_num_pending, \
2673 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2674 (LOI)->loi_read_lop.lop_num_pending, \
2675 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2678 /* This is called by osc_check_rpcs() to find which objects have pages that
2679 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2680 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2684 /* First return objects that have blocked locks so that they
2685 * will be flushed quickly and other clients can get the lock,
2686 * then objects which have pages ready to be stuffed into RPCs */
2687 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2688 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2689 struct lov_oinfo, loi_hp_ready_item));
2690 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2691 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2692 struct lov_oinfo, loi_ready_item));
2694 /* then if we have cache waiters, return all objects with queued
2695 * writes. This is especially important when many small files
2696 * have filled up the cache and not been fired into rpcs because
2697 * they don't pass the nr_pending/object threshhold */
2698 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2699 !cfs_list_empty(&cli->cl_loi_write_list))
2700 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2701 struct lov_oinfo, loi_write_item));
2703 /* then return all queued objects when we have an invalid import
2704 * so that they get flushed */
2705 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2706 if (!cfs_list_empty(&cli->cl_loi_write_list))
2707 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2710 if (!cfs_list_empty(&cli->cl_loi_read_list))
2711 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2712 struct lov_oinfo, loi_read_item));
2717 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2719 struct osc_async_page *oap;
2722 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2723 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2724 struct osc_async_page, oap_urgent_item);
2725 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2728 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2729 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2730 struct osc_async_page, oap_urgent_item);
2731 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2734 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2737 /* called with the loi list lock held */
2738 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2740 struct lov_oinfo *loi;
2741 int rc = 0, race_counter = 0;
2744 while ((loi = osc_next_loi(cli)) != NULL) {
2745 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2747 if (osc_max_rpc_in_flight(cli, loi))
2750 /* attempt some read/write balancing by alternating between
2751 * reads and writes in an object. The makes_rpc checks here
2752 * would be redundant if we were getting read/write work items
2753 * instead of objects. we don't want send_oap_rpc to drain a
2754 * partial read pending queue when we're given this object to
2755 * do io on writes while there are cache waiters */
2756 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2757 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2758 &loi->loi_write_lop);
2760 CERROR("Write request failed with %d\n", rc);
2762 /* osc_send_oap_rpc failed, mostly because of
2765 * It can't break here, because if:
2766 * - a page was submitted by osc_io_submit, so
2768 * - no request in flight
2769 * - no subsequent request
2770 * The system will be in live-lock state,
2771 * because there is no chance to call
2772 * osc_io_unplug() and osc_check_rpcs() any
2773 * more. pdflush can't help in this case,
2774 * because it might be blocked at grabbing
2775 * the page lock as we mentioned.
2777 * Anyway, continue to drain pages. */
2786 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2787 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2788 &loi->loi_read_lop);
2790 CERROR("Read request failed with %d\n", rc);
2798 /* attempt some inter-object balancing by issuing rpcs
2799 * for each object in turn */
2800 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2801 cfs_list_del_init(&loi->loi_hp_ready_item);
2802 if (!cfs_list_empty(&loi->loi_ready_item))
2803 cfs_list_del_init(&loi->loi_ready_item);
2804 if (!cfs_list_empty(&loi->loi_write_item))
2805 cfs_list_del_init(&loi->loi_write_item);
2806 if (!cfs_list_empty(&loi->loi_read_item))
2807 cfs_list_del_init(&loi->loi_read_item);
2809 loi_list_maint(cli, loi);
2811 /* send_oap_rpc fails with 0 when make_ready tells it to
2812 * back off. llite's make_ready does this when it tries
2813 * to lock a page queued for write that is already locked.
2814 * we want to try sending rpcs from many objects, but we
2815 * don't want to spin failing with 0. */
2816 if (race_counter == 10)
2822 /* we're trying to queue a page in the osc so we're subject to the
2823 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2824 * If the osc's queued pages are already at that limit, then we want to sleep
2825 * until there is space in the osc's queue for us. We also may be waiting for
2826 * write credits from the OST if there are RPCs in flight that may return some
2827 * before we fall back to sync writes.
2829 * We need this know our allocation was granted in the presence of signals */
2830 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2834 client_obd_list_lock(&cli->cl_loi_list_lock);
2835 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2836 client_obd_list_unlock(&cli->cl_loi_list_lock);
2841 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2844 int osc_enter_cache_try(const struct lu_env *env,
2845 struct client_obd *cli, struct lov_oinfo *loi,
2846 struct osc_async_page *oap, int transient)
2850 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2852 osc_consume_write_grant(cli, &oap->oap_brw_page);
2854 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2855 cfs_atomic_inc(&obd_dirty_transit_pages);
2856 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2862 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2863 * grant or cache space. */
2864 static int osc_enter_cache(const struct lu_env *env,
2865 struct client_obd *cli, struct lov_oinfo *loi,
2866 struct osc_async_page *oap)
2868 struct osc_cache_waiter ocw;
2869 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2873 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2874 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2875 cli->cl_dirty_max, obd_max_dirty_pages,
2876 cli->cl_lost_grant, cli->cl_avail_grant);
2878 /* force the caller to try sync io. this can jump the list
2879 * of queued writes and create a discontiguous rpc stream */
2880 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2881 loi->loi_ar.ar_force_sync)
2884 /* Hopefully normal case - cache space and write credits available */
2885 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2886 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2887 osc_enter_cache_try(env, cli, loi, oap, 0))
2890 /* It is safe to block as a cache waiter as long as there is grant
2891 * space available or the hope of additional grant being returned
2892 * when an in flight write completes. Using the write back cache
2893 * if possible is preferable to sending the data synchronously
2894 * because write pages can then be merged in to large requests.
2895 * The addition of this cache waiter will causing pending write
2896 * pages to be sent immediately. */
2897 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2898 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2899 cfs_waitq_init(&ocw.ocw_waitq);
2903 loi_list_maint(cli, loi);
2904 osc_check_rpcs(env, cli);
2905 client_obd_list_unlock(&cli->cl_loi_list_lock);
2907 CDEBUG(D_CACHE, "sleeping for cache space\n");
2908 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2910 client_obd_list_lock(&cli->cl_loi_list_lock);
2911 if (!cfs_list_empty(&ocw.ocw_entry)) {
2912 cfs_list_del(&ocw.ocw_entry);
2922 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2923 struct lov_oinfo *loi, cfs_page_t *page,
2924 obd_off offset, const struct obd_async_page_ops *ops,
2925 void *data, void **res, int nocache,
2926 struct lustre_handle *lockh)
2928 struct osc_async_page *oap;
2933 return cfs_size_round(sizeof(*oap));
2936 oap->oap_magic = OAP_MAGIC;
2937 oap->oap_cli = &exp->exp_obd->u.cli;
2940 oap->oap_caller_ops = ops;
2941 oap->oap_caller_data = data;
2943 oap->oap_page = page;
2944 oap->oap_obj_off = offset;
2945 if (!client_is_remote(exp) &&
2946 cfs_capable(CFS_CAP_SYS_RESOURCE))
2947 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2949 LASSERT(!(offset & ~CFS_PAGE_MASK));
2951 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2952 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2953 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2954 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2956 cfs_spin_lock_init(&oap->oap_lock);
2957 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2961 struct osc_async_page *oap_from_cookie(void *cookie)
2963 struct osc_async_page *oap = cookie;
2964 if (oap->oap_magic != OAP_MAGIC)
2965 return ERR_PTR(-EINVAL);
2969 int osc_queue_async_io(const struct lu_env *env,
2970 struct obd_export *exp, struct lov_stripe_md *lsm,
2971 struct lov_oinfo *loi, void *cookie,
2972 int cmd, obd_off off, int count,
2973 obd_flag brw_flags, enum async_flags async_flags)
2975 struct client_obd *cli = &exp->exp_obd->u.cli;
2976 struct osc_async_page *oap;
2980 oap = oap_from_cookie(cookie);
2982 RETURN(PTR_ERR(oap));
2984 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2987 if (!cfs_list_empty(&oap->oap_pending_item) ||
2988 !cfs_list_empty(&oap->oap_urgent_item) ||
2989 !cfs_list_empty(&oap->oap_rpc_item))
2992 /* check if the file's owner/group is over quota */
2993 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2994 struct cl_object *obj;
2995 struct cl_attr attr; /* XXX put attr into thread info */
2996 unsigned int qid[MAXQUOTAS];
2998 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3000 cl_object_attr_lock(obj);
3001 rc = cl_object_attr_get(env, obj, &attr);
3002 cl_object_attr_unlock(obj);
3004 qid[USRQUOTA] = attr.cat_uid;
3005 qid[GRPQUOTA] = attr.cat_gid;
3007 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3014 loi = lsm->lsm_oinfo[0];
3016 client_obd_list_lock(&cli->cl_loi_list_lock);
3018 LASSERT(off + count <= CFS_PAGE_SIZE);
3020 oap->oap_page_off = off;
3021 oap->oap_count = count;
3022 oap->oap_brw_flags = brw_flags;
3023 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3024 if (cfs_memory_pressure_get())
3025 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3026 cfs_spin_lock(&oap->oap_lock);
3027 oap->oap_async_flags = async_flags;
3028 cfs_spin_unlock(&oap->oap_lock);
3030 if (cmd & OBD_BRW_WRITE) {
3031 rc = osc_enter_cache(env, cli, loi, oap);
3033 client_obd_list_unlock(&cli->cl_loi_list_lock);
3038 osc_oap_to_pending(oap);
3039 loi_list_maint(cli, loi);
3041 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3044 osc_check_rpcs(env, cli);
3045 client_obd_list_unlock(&cli->cl_loi_list_lock);
3050 /* aka (~was & now & flag), but this is more clear :) */
3051 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3053 int osc_set_async_flags_base(struct client_obd *cli,
3054 struct lov_oinfo *loi, struct osc_async_page *oap,
3055 obd_flag async_flags)
3057 struct loi_oap_pages *lop;
3061 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3063 if (oap->oap_cmd & OBD_BRW_WRITE) {
3064 lop = &loi->loi_write_lop;
3066 lop = &loi->loi_read_lop;
3069 if ((oap->oap_async_flags & async_flags) == async_flags)
3072 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3073 flags |= ASYNC_READY;
3075 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3076 cfs_list_empty(&oap->oap_rpc_item)) {
3077 if (oap->oap_async_flags & ASYNC_HP)
3078 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3080 cfs_list_add_tail(&oap->oap_urgent_item,
3082 flags |= ASYNC_URGENT;
3083 loi_list_maint(cli, loi);
3085 cfs_spin_lock(&oap->oap_lock);
3086 oap->oap_async_flags |= flags;
3087 cfs_spin_unlock(&oap->oap_lock);
3089 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3090 oap->oap_async_flags);
3094 int osc_teardown_async_page(struct obd_export *exp,
3095 struct lov_stripe_md *lsm,
3096 struct lov_oinfo *loi, void *cookie)
3098 struct client_obd *cli = &exp->exp_obd->u.cli;
3099 struct loi_oap_pages *lop;
3100 struct osc_async_page *oap;
3104 oap = oap_from_cookie(cookie);
3106 RETURN(PTR_ERR(oap));
3109 loi = lsm->lsm_oinfo[0];
3111 if (oap->oap_cmd & OBD_BRW_WRITE) {
3112 lop = &loi->loi_write_lop;
3114 lop = &loi->loi_read_lop;
3117 client_obd_list_lock(&cli->cl_loi_list_lock);
3119 if (!cfs_list_empty(&oap->oap_rpc_item))
3120 GOTO(out, rc = -EBUSY);
3122 osc_exit_cache(cli, oap, 0);
3123 osc_wake_cache_waiters(cli);
3125 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3126 cfs_list_del_init(&oap->oap_urgent_item);
3127 cfs_spin_lock(&oap->oap_lock);
3128 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3129 cfs_spin_unlock(&oap->oap_lock);
3131 if (!cfs_list_empty(&oap->oap_pending_item)) {
3132 cfs_list_del_init(&oap->oap_pending_item);
3133 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3135 loi_list_maint(cli, loi);
3136 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3138 client_obd_list_unlock(&cli->cl_loi_list_lock);
3142 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3143 struct ldlm_enqueue_info *einfo)
3145 void *data = einfo->ei_cbdata;
3148 LASSERT(lock != NULL);
3149 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3150 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3151 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3152 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3154 lock_res_and_lock(lock);
3155 cfs_spin_lock(&osc_ast_guard);
3157 if (lock->l_ast_data == NULL)
3158 lock->l_ast_data = data;
3159 if (lock->l_ast_data == data)
3162 cfs_spin_unlock(&osc_ast_guard);
3163 unlock_res_and_lock(lock);
3168 static int osc_set_data_with_check(struct lustre_handle *lockh,
3169 struct ldlm_enqueue_info *einfo)
3171 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3175 set = osc_set_lock_data_with_check(lock, einfo);
3176 LDLM_LOCK_PUT(lock);
3178 CERROR("lockh %p, data %p - client evicted?\n",
3179 lockh, einfo->ei_cbdata);
3183 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3184 ldlm_iterator_t replace, void *data)
3186 struct ldlm_res_id res_id;
3187 struct obd_device *obd = class_exp2obd(exp);
3189 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3190 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3194 /* find any ldlm lock of the inode in osc
3198 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3199 ldlm_iterator_t replace, void *data)
3201 struct ldlm_res_id res_id;
3202 struct obd_device *obd = class_exp2obd(exp);
3205 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3206 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3207 if (rc == LDLM_ITER_STOP)
3209 if (rc == LDLM_ITER_CONTINUE)
3214 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3215 obd_enqueue_update_f upcall, void *cookie,
3218 int intent = *flags & LDLM_FL_HAS_INTENT;
3222 /* The request was created before ldlm_cli_enqueue call. */
3223 if (rc == ELDLM_LOCK_ABORTED) {
3224 struct ldlm_reply *rep;
3225 rep = req_capsule_server_get(&req->rq_pill,
3228 LASSERT(rep != NULL);
3229 if (rep->lock_policy_res1)
3230 rc = rep->lock_policy_res1;
3234 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3235 *flags |= LDLM_FL_LVB_READY;
3236 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3237 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3240 /* Call the update callback. */
3241 rc = (*upcall)(cookie, rc);
3245 static int osc_enqueue_interpret(const struct lu_env *env,
3246 struct ptlrpc_request *req,
3247 struct osc_enqueue_args *aa, int rc)
3249 struct ldlm_lock *lock;
3250 struct lustre_handle handle;
3253 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3254 * might be freed anytime after lock upcall has been called. */
3255 lustre_handle_copy(&handle, aa->oa_lockh);
3256 mode = aa->oa_ei->ei_mode;
3258 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3260 lock = ldlm_handle2lock(&handle);
3262 /* Take an additional reference so that a blocking AST that
3263 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3264 * to arrive after an upcall has been executed by
3265 * osc_enqueue_fini(). */
3266 ldlm_lock_addref(&handle, mode);
3268 /* Let CP AST to grant the lock first. */
3269 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3271 /* Complete obtaining the lock procedure. */
3272 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3273 mode, aa->oa_flags, aa->oa_lvb,
3274 sizeof(*aa->oa_lvb), &handle, rc);
3275 /* Complete osc stuff. */
3276 rc = osc_enqueue_fini(req, aa->oa_lvb,
3277 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3279 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3281 /* Release the lock for async request. */
3282 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3284 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3285 * not already released by
3286 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3288 ldlm_lock_decref(&handle, mode);
3290 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3291 aa->oa_lockh, req, aa);
3292 ldlm_lock_decref(&handle, mode);
3293 LDLM_LOCK_PUT(lock);
3297 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3298 struct lov_oinfo *loi, int flags,
3299 struct ost_lvb *lvb, __u32 mode, int rc)
3301 if (rc == ELDLM_OK) {
3302 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3305 LASSERT(lock != NULL);
3306 loi->loi_lvb = *lvb;
3307 tmp = loi->loi_lvb.lvb_size;
3308 /* Extend KMS up to the end of this lock and no further
3309 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3310 if (tmp > lock->l_policy_data.l_extent.end)
3311 tmp = lock->l_policy_data.l_extent.end + 1;
3312 if (tmp >= loi->loi_kms) {
3313 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3314 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3315 loi_kms_set(loi, tmp);
3317 LDLM_DEBUG(lock, "lock acquired, setting rss="
3318 LPU64"; leaving kms="LPU64", end="LPU64,
3319 loi->loi_lvb.lvb_size, loi->loi_kms,
3320 lock->l_policy_data.l_extent.end);
3322 ldlm_lock_allow_match(lock);
3323 LDLM_LOCK_PUT(lock);
3324 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3325 loi->loi_lvb = *lvb;
3326 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3327 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3331 EXPORT_SYMBOL(osc_update_enqueue);
3333 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3335 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3336 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3337 * other synchronous requests, however keeping some locks and trying to obtain
3338 * others may take a considerable amount of time in a case of ost failure; and
3339 * when other sync requests do not get released lock from a client, the client
3340 * is excluded from the cluster -- such scenarious make the life difficult, so
3341 * release locks just after they are obtained. */
3342 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3343 int *flags, ldlm_policy_data_t *policy,
3344 struct ost_lvb *lvb, int kms_valid,
3345 obd_enqueue_update_f upcall, void *cookie,
3346 struct ldlm_enqueue_info *einfo,
3347 struct lustre_handle *lockh,
3348 struct ptlrpc_request_set *rqset, int async)
3350 struct obd_device *obd = exp->exp_obd;
3351 struct ptlrpc_request *req = NULL;
3352 int intent = *flags & LDLM_FL_HAS_INTENT;
3357 /* Filesystem lock extents are extended to page boundaries so that
3358 * dealing with the page cache is a little smoother. */
3359 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3360 policy->l_extent.end |= ~CFS_PAGE_MASK;
3363 * kms is not valid when either object is completely fresh (so that no
3364 * locks are cached), or object was evicted. In the latter case cached
3365 * lock cannot be used, because it would prime inode state with
3366 * potentially stale LVB.
3371 /* Next, search for already existing extent locks that will cover us */
3372 /* If we're trying to read, we also search for an existing PW lock. The
3373 * VFS and page cache already protect us locally, so lots of readers/
3374 * writers can share a single PW lock.
3376 * There are problems with conversion deadlocks, so instead of
3377 * converting a read lock to a write lock, we'll just enqueue a new
3380 * At some point we should cancel the read lock instead of making them
3381 * send us a blocking callback, but there are problems with canceling
3382 * locks out from other users right now, too. */
3383 mode = einfo->ei_mode;
3384 if (einfo->ei_mode == LCK_PR)
3386 mode = ldlm_lock_match(obd->obd_namespace,
3387 *flags | LDLM_FL_LVB_READY, res_id,
3388 einfo->ei_type, policy, mode, lockh, 0);
3390 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3392 if (osc_set_lock_data_with_check(matched, einfo)) {
3393 /* addref the lock only if not async requests and PW
3394 * lock is matched whereas we asked for PR. */
3395 if (!rqset && einfo->ei_mode != mode)
3396 ldlm_lock_addref(lockh, LCK_PR);
3398 /* I would like to be able to ASSERT here that
3399 * rss <= kms, but I can't, for reasons which
3400 * are explained in lov_enqueue() */
3403 /* We already have a lock, and it's referenced */
3404 (*upcall)(cookie, ELDLM_OK);
3406 /* For async requests, decref the lock. */
3407 if (einfo->ei_mode != mode)
3408 ldlm_lock_decref(lockh, LCK_PW);
3410 ldlm_lock_decref(lockh, einfo->ei_mode);
3411 LDLM_LOCK_PUT(matched);
3414 ldlm_lock_decref(lockh, mode);
3415 LDLM_LOCK_PUT(matched);
3420 CFS_LIST_HEAD(cancels);
3421 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3422 &RQF_LDLM_ENQUEUE_LVB);
3426 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3428 ptlrpc_request_free(req);
3432 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3434 ptlrpc_request_set_replen(req);
3437 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3438 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3440 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3441 sizeof(*lvb), lockh, async);
3444 struct osc_enqueue_args *aa;
3445 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3446 aa = ptlrpc_req_async_args(req);
3449 aa->oa_flags = flags;
3450 aa->oa_upcall = upcall;
3451 aa->oa_cookie = cookie;
3453 aa->oa_lockh = lockh;
3455 req->rq_interpret_reply =
3456 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3457 if (rqset == PTLRPCD_SET)
3458 ptlrpcd_add_req(req, PSCOPE_OTHER);
3460 ptlrpc_set_add_req(rqset, req);
3461 } else if (intent) {
3462 ptlrpc_req_finished(req);
3467 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3469 ptlrpc_req_finished(req);
3474 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3475 struct ldlm_enqueue_info *einfo,
3476 struct ptlrpc_request_set *rqset)
3478 struct ldlm_res_id res_id;
3482 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3483 oinfo->oi_md->lsm_object_seq, &res_id);
3485 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3486 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3487 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3488 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3489 rqset, rqset != NULL);
3493 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3494 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3495 int *flags, void *data, struct lustre_handle *lockh,
3498 struct obd_device *obd = exp->exp_obd;
3499 int lflags = *flags;
3503 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3506 /* Filesystem lock extents are extended to page boundaries so that
3507 * dealing with the page cache is a little smoother */
3508 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3509 policy->l_extent.end |= ~CFS_PAGE_MASK;
3511 /* Next, search for already existing extent locks that will cover us */
3512 /* If we're trying to read, we also search for an existing PW lock. The
3513 * VFS and page cache already protect us locally, so lots of readers/
3514 * writers can share a single PW lock. */
3518 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3519 res_id, type, policy, rc, lockh, unref);
3522 if (!osc_set_data_with_check(lockh, data)) {
3523 if (!(lflags & LDLM_FL_TEST_LOCK))
3524 ldlm_lock_decref(lockh, rc);
3528 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3529 ldlm_lock_addref(lockh, LCK_PR);
3530 ldlm_lock_decref(lockh, LCK_PW);
3537 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3541 if (unlikely(mode == LCK_GROUP))
3542 ldlm_lock_decref_and_cancel(lockh, mode);
3544 ldlm_lock_decref(lockh, mode);
3549 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3550 __u32 mode, struct lustre_handle *lockh)
3553 RETURN(osc_cancel_base(lockh, mode));
3556 static int osc_cancel_unused(struct obd_export *exp,
3557 struct lov_stripe_md *lsm,
3558 ldlm_cancel_flags_t flags,
3561 struct obd_device *obd = class_exp2obd(exp);
3562 struct ldlm_res_id res_id, *resp = NULL;
3565 resp = osc_build_res_name(lsm->lsm_object_id,
3566 lsm->lsm_object_seq, &res_id);
3569 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3572 static int osc_statfs_interpret(const struct lu_env *env,
3573 struct ptlrpc_request *req,
3574 struct osc_async_args *aa, int rc)
3576 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3577 struct obd_statfs *msfs;
3582 /* The request has in fact never been sent
3583 * due to issues at a higher level (LOV).
3584 * Exit immediately since the caller is
3585 * aware of the problem and takes care
3586 * of the clean up */
3589 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3590 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3596 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3598 GOTO(out, rc = -EPROTO);
3601 /* Reinitialize the RDONLY and DEGRADED flags at the client
3602 * on each statfs, so they don't stay set permanently. */
3603 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3605 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3606 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3607 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3608 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3610 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3611 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3612 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3613 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3615 /* Add a bit of hysteresis so this flag isn't continually flapping,
3616 * and ensure that new files don't get extremely fragmented due to
3617 * only a small amount of available space in the filesystem.
3618 * We want to set the NOSPC flag when there is less than ~0.1% free
3619 * and clear it when there is at least ~0.2% free space, so:
3620 * avail < ~0.1% max max = avail + used
3621 * 1025 * avail < avail + used used = blocks - free
3622 * 1024 * avail < used
3623 * 1024 * avail < blocks - free
3624 * avail < ((blocks - free) >> 10)
3626 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3627 * lose that amount of space so in those cases we report no space left
3628 * if their is less than 1 GB left. */
3629 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3630 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3631 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3632 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3633 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3634 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3635 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3637 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3639 *aa->aa_oi->oi_osfs = *msfs;
3641 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3645 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3646 __u64 max_age, struct ptlrpc_request_set *rqset)
3648 struct ptlrpc_request *req;
3649 struct osc_async_args *aa;
3653 /* We could possibly pass max_age in the request (as an absolute
3654 * timestamp or a "seconds.usec ago") so the target can avoid doing
3655 * extra calls into the filesystem if that isn't necessary (e.g.
3656 * during mount that would help a bit). Having relative timestamps
3657 * is not so great if request processing is slow, while absolute
3658 * timestamps are not ideal because they need time synchronization. */
3659 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3663 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3665 ptlrpc_request_free(req);
3668 ptlrpc_request_set_replen(req);
3669 req->rq_request_portal = OST_CREATE_PORTAL;
3670 ptlrpc_at_set_req_timeout(req);
3672 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3673 /* procfs requests not want stat in wait for avoid deadlock */
3674 req->rq_no_resend = 1;
3675 req->rq_no_delay = 1;
3678 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3679 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3680 aa = ptlrpc_req_async_args(req);
3683 ptlrpc_set_add_req(rqset, req);
3687 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3688 __u64 max_age, __u32 flags)
3690 struct obd_statfs *msfs;
3691 struct ptlrpc_request *req;
3692 struct obd_import *imp = NULL;
3696 /*Since the request might also come from lprocfs, so we need
3697 *sync this with client_disconnect_export Bug15684*/
3698 cfs_down_read(&obd->u.cli.cl_sem);
3699 if (obd->u.cli.cl_import)
3700 imp = class_import_get(obd->u.cli.cl_import);
3701 cfs_up_read(&obd->u.cli.cl_sem);
3705 /* We could possibly pass max_age in the request (as an absolute
3706 * timestamp or a "seconds.usec ago") so the target can avoid doing
3707 * extra calls into the filesystem if that isn't necessary (e.g.
3708 * during mount that would help a bit). Having relative timestamps
3709 * is not so great if request processing is slow, while absolute
3710 * timestamps are not ideal because they need time synchronization. */
3711 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3713 class_import_put(imp);
3718 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3720 ptlrpc_request_free(req);
3723 ptlrpc_request_set_replen(req);
3724 req->rq_request_portal = OST_CREATE_PORTAL;
3725 ptlrpc_at_set_req_timeout(req);
3727 if (flags & OBD_STATFS_NODELAY) {
3728 /* procfs requests not want stat in wait for avoid deadlock */
3729 req->rq_no_resend = 1;
3730 req->rq_no_delay = 1;
3733 rc = ptlrpc_queue_wait(req);
3737 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3739 GOTO(out, rc = -EPROTO);
3746 ptlrpc_req_finished(req);
3750 /* Retrieve object striping information.
3752 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3753 * the maximum number of OST indices which will fit in the user buffer.
3754 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3756 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3758 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3759 struct lov_user_md_v3 lum, *lumk;
3760 struct lov_user_ost_data_v1 *lmm_objects;
3761 int rc = 0, lum_size;
3767 /* we only need the header part from user space to get lmm_magic and
3768 * lmm_stripe_count, (the header part is common to v1 and v3) */
3769 lum_size = sizeof(struct lov_user_md_v1);
3770 if (cfs_copy_from_user(&lum, lump, lum_size))
3773 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3774 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3777 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3778 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3779 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3780 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3782 /* we can use lov_mds_md_size() to compute lum_size
3783 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3784 if (lum.lmm_stripe_count > 0) {
3785 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3786 OBD_ALLOC(lumk, lum_size);
3790 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3791 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3793 lmm_objects = &(lumk->lmm_objects[0]);
3794 lmm_objects->l_object_id = lsm->lsm_object_id;
3796 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3800 lumk->lmm_object_id = lsm->lsm_object_id;
3801 lumk->lmm_object_seq = lsm->lsm_object_seq;
3802 lumk->lmm_stripe_count = 1;
3804 if (cfs_copy_to_user(lump, lumk, lum_size))
3808 OBD_FREE(lumk, lum_size);
3814 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3815 void *karg, void *uarg)
3817 struct obd_device *obd = exp->exp_obd;
3818 struct obd_ioctl_data *data = karg;
3822 if (!cfs_try_module_get(THIS_MODULE)) {
3823 CERROR("Can't get module. Is it alive?");
3827 case OBD_IOC_LOV_GET_CONFIG: {
3829 struct lov_desc *desc;
3830 struct obd_uuid uuid;
3834 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3835 GOTO(out, err = -EINVAL);
3837 data = (struct obd_ioctl_data *)buf;
3839 if (sizeof(*desc) > data->ioc_inllen1) {
3840 obd_ioctl_freedata(buf, len);
3841 GOTO(out, err = -EINVAL);
3844 if (data->ioc_inllen2 < sizeof(uuid)) {
3845 obd_ioctl_freedata(buf, len);
3846 GOTO(out, err = -EINVAL);
3849 desc = (struct lov_desc *)data->ioc_inlbuf1;
3850 desc->ld_tgt_count = 1;
3851 desc->ld_active_tgt_count = 1;
3852 desc->ld_default_stripe_count = 1;
3853 desc->ld_default_stripe_size = 0;
3854 desc->ld_default_stripe_offset = 0;
3855 desc->ld_pattern = 0;
3856 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3858 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3860 err = cfs_copy_to_user((void *)uarg, buf, len);
3863 obd_ioctl_freedata(buf, len);
3866 case LL_IOC_LOV_SETSTRIPE:
3867 err = obd_alloc_memmd(exp, karg);
3871 case LL_IOC_LOV_GETSTRIPE:
3872 err = osc_getstripe(karg, uarg);
3874 case OBD_IOC_CLIENT_RECOVER:
3875 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3880 case IOC_OSC_SET_ACTIVE:
3881 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3884 case OBD_IOC_POLL_QUOTACHECK:
3885 err = lquota_poll_check(quota_interface, exp,
3886 (struct if_quotacheck *)karg);
3888 case OBD_IOC_PING_TARGET:
3889 err = ptlrpc_obd_ping(obd);
3892 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3893 cmd, cfs_curproc_comm());
3894 GOTO(out, err = -ENOTTY);
3897 cfs_module_put(THIS_MODULE);
3901 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3902 void *key, __u32 *vallen, void *val,
3903 struct lov_stripe_md *lsm)
3906 if (!vallen || !val)
3909 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3910 __u32 *stripe = val;
3911 *vallen = sizeof(*stripe);
3914 } else if (KEY_IS(KEY_LAST_ID)) {
3915 struct ptlrpc_request *req;
3920 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3921 &RQF_OST_GET_INFO_LAST_ID);
3925 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3926 RCL_CLIENT, keylen);
3927 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3929 ptlrpc_request_free(req);
3933 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3934 memcpy(tmp, key, keylen);
3936 req->rq_no_delay = req->rq_no_resend = 1;
3937 ptlrpc_request_set_replen(req);
3938 rc = ptlrpc_queue_wait(req);
3942 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3944 GOTO(out, rc = -EPROTO);
3946 *((obd_id *)val) = *reply;
3948 ptlrpc_req_finished(req);
3950 } else if (KEY_IS(KEY_FIEMAP)) {
3951 struct ptlrpc_request *req;
3952 struct ll_user_fiemap *reply;
3956 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3957 &RQF_OST_GET_INFO_FIEMAP);
3961 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3962 RCL_CLIENT, keylen);
3963 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3964 RCL_CLIENT, *vallen);
3965 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3966 RCL_SERVER, *vallen);
3968 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3970 ptlrpc_request_free(req);
3974 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3975 memcpy(tmp, key, keylen);
3976 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3977 memcpy(tmp, val, *vallen);
3979 ptlrpc_request_set_replen(req);
3980 rc = ptlrpc_queue_wait(req);
3984 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3986 GOTO(out1, rc = -EPROTO);
3988 memcpy(val, reply, *vallen);
3990 ptlrpc_req_finished(req);
3998 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4000 struct llog_ctxt *ctxt;
4004 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4006 rc = llog_initiator_connect(ctxt);
4007 llog_ctxt_put(ctxt);
4009 /* XXX return an error? skip setting below flags? */
4012 cfs_spin_lock(&imp->imp_lock);
4013 imp->imp_server_timeout = 1;
4014 imp->imp_pingable = 1;
4015 cfs_spin_unlock(&imp->imp_lock);
4016 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4021 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4022 struct ptlrpc_request *req,
4029 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4032 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4033 void *key, obd_count vallen, void *val,
4034 struct ptlrpc_request_set *set)
4036 struct ptlrpc_request *req;
4037 struct obd_device *obd = exp->exp_obd;
4038 struct obd_import *imp = class_exp2cliimp(exp);
4043 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4045 if (KEY_IS(KEY_NEXT_ID)) {
4047 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4049 if (vallen != sizeof(obd_id))
4054 if (vallen != sizeof(obd_id))
4057 /* avoid race between allocate new object and set next id
4058 * from ll_sync thread */
4059 cfs_spin_lock(&oscc->oscc_lock);
4060 new_val = *((obd_id*)val) + 1;
4061 if (new_val > oscc->oscc_next_id)
4062 oscc->oscc_next_id = new_val;
4063 cfs_spin_unlock(&oscc->oscc_lock);
4064 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4065 exp->exp_obd->obd_name,
4066 obd->u.cli.cl_oscc.oscc_next_id);
4071 if (KEY_IS(KEY_CHECKSUM)) {
4072 if (vallen != sizeof(int))
4074 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4078 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4079 sptlrpc_conf_client_adapt(obd);
4083 if (KEY_IS(KEY_FLUSH_CTX)) {
4084 sptlrpc_import_flush_my_ctx(imp);
4088 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4091 /* We pass all other commands directly to OST. Since nobody calls osc
4092 methods directly and everybody is supposed to go through LOV, we
4093 assume lov checked invalid values for us.
4094 The only recognised values so far are evict_by_nid and mds_conn.
4095 Even if something bad goes through, we'd get a -EINVAL from OST
4098 if (KEY_IS(KEY_GRANT_SHRINK))
4099 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4101 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4106 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4107 RCL_CLIENT, keylen);
4108 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4109 RCL_CLIENT, vallen);
4110 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4112 ptlrpc_request_free(req);
4116 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4117 memcpy(tmp, key, keylen);
4118 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4119 memcpy(tmp, val, vallen);
4121 if (KEY_IS(KEY_MDS_CONN)) {
4122 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4124 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4125 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4126 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4127 req->rq_no_delay = req->rq_no_resend = 1;
4128 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4129 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4130 struct osc_grant_args *aa;
4133 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4134 aa = ptlrpc_req_async_args(req);
4137 ptlrpc_req_finished(req);
4140 *oa = ((struct ost_body *)val)->oa;
4142 req->rq_interpret_reply = osc_shrink_grant_interpret;
4145 ptlrpc_request_set_replen(req);
4146 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4147 LASSERT(set != NULL);
4148 ptlrpc_set_add_req(set, req);
4149 ptlrpc_check_set(NULL, set);
4151 ptlrpcd_add_req(req, PSCOPE_OTHER);
4157 static struct llog_operations osc_size_repl_logops = {
4158 lop_cancel: llog_obd_repl_cancel
4161 static struct llog_operations osc_mds_ost_orig_logops;
4163 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4164 struct obd_device *tgt, struct llog_catid *catid)
4169 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4170 &catid->lci_logid, &osc_mds_ost_orig_logops);
4172 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4176 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4177 NULL, &osc_size_repl_logops);
4179 struct llog_ctxt *ctxt =
4180 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4183 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4188 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4189 obd->obd_name, tgt->obd_name, catid, rc);
4190 CERROR("logid "LPX64":0x%x\n",
4191 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4196 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4197 struct obd_device *disk_obd, int *index)
4199 struct llog_catid catid;
4200 static char name[32] = CATLIST;
4204 LASSERT(olg == &obd->obd_olg);
4206 cfs_mutex_down(&olg->olg_cat_processing);
4207 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4209 CERROR("rc: %d\n", rc);
4213 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4214 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4215 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4217 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4219 CERROR("rc: %d\n", rc);
4223 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4225 CERROR("rc: %d\n", rc);
4230 cfs_mutex_up(&olg->olg_cat_processing);
4235 static int osc_llog_finish(struct obd_device *obd, int count)
4237 struct llog_ctxt *ctxt;
4238 int rc = 0, rc2 = 0;
4241 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4243 rc = llog_cleanup(ctxt);
4245 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4247 rc2 = llog_cleanup(ctxt);
4254 static int osc_reconnect(const struct lu_env *env,
4255 struct obd_export *exp, struct obd_device *obd,
4256 struct obd_uuid *cluuid,
4257 struct obd_connect_data *data,
4260 struct client_obd *cli = &obd->u.cli;
4262 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4265 client_obd_list_lock(&cli->cl_loi_list_lock);
4266 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4267 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4268 lost_grant = cli->cl_lost_grant;
4269 cli->cl_lost_grant = 0;
4270 client_obd_list_unlock(&cli->cl_loi_list_lock);
4272 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4273 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4274 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4275 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4276 " ocd_grant: %d\n", data->ocd_connect_flags,
4277 data->ocd_version, data->ocd_grant);
4283 static int osc_disconnect(struct obd_export *exp)
4285 struct obd_device *obd = class_exp2obd(exp);
4286 struct llog_ctxt *ctxt;
4289 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4291 if (obd->u.cli.cl_conn_count == 1) {
4292 /* Flush any remaining cancel messages out to the
4294 llog_sync(ctxt, exp);
4296 llog_ctxt_put(ctxt);
4298 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4302 rc = client_disconnect_export(exp);
4304 * Initially we put del_shrink_grant before disconnect_export, but it
4305 * causes the following problem if setup (connect) and cleanup
4306 * (disconnect) are tangled together.
4307 * connect p1 disconnect p2
4308 * ptlrpc_connect_import
4309 * ............... class_manual_cleanup
4312 * ptlrpc_connect_interrupt
4314 * add this client to shrink list
4316 * Bang! pinger trigger the shrink.
4317 * So the osc should be disconnected from the shrink list, after we
4318 * are sure the import has been destroyed. BUG18662
4320 if (obd->u.cli.cl_import == NULL)
4321 osc_del_shrink_grant(&obd->u.cli);
4325 static int osc_import_event(struct obd_device *obd,
4326 struct obd_import *imp,
4327 enum obd_import_event event)
4329 struct client_obd *cli;
4333 LASSERT(imp->imp_obd == obd);
4336 case IMP_EVENT_DISCON: {
4337 /* Only do this on the MDS OSC's */
4338 if (imp->imp_server_timeout) {
4339 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4341 cfs_spin_lock(&oscc->oscc_lock);
4342 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4343 cfs_spin_unlock(&oscc->oscc_lock);
4346 client_obd_list_lock(&cli->cl_loi_list_lock);
4347 cli->cl_avail_grant = 0;
4348 cli->cl_lost_grant = 0;
4349 client_obd_list_unlock(&cli->cl_loi_list_lock);
4352 case IMP_EVENT_INACTIVE: {
4353 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4356 case IMP_EVENT_INVALIDATE: {
4357 struct ldlm_namespace *ns = obd->obd_namespace;
4361 env = cl_env_get(&refcheck);
4365 client_obd_list_lock(&cli->cl_loi_list_lock);
4366 /* all pages go to failing rpcs due to the invalid
4368 osc_check_rpcs(env, cli);
4369 client_obd_list_unlock(&cli->cl_loi_list_lock);
4371 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4372 cl_env_put(env, &refcheck);
4377 case IMP_EVENT_ACTIVE: {
4378 /* Only do this on the MDS OSC's */
4379 if (imp->imp_server_timeout) {
4380 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4382 cfs_spin_lock(&oscc->oscc_lock);
4383 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4384 cfs_spin_unlock(&oscc->oscc_lock);
4386 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4389 case IMP_EVENT_OCD: {
4390 struct obd_connect_data *ocd = &imp->imp_connect_data;
4392 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4393 osc_init_grant(&obd->u.cli, ocd);
4396 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4397 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4399 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4402 case IMP_EVENT_DEACTIVATE: {
4403 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4406 case IMP_EVENT_ACTIVATE: {
4407 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4411 CERROR("Unknown import event %d\n", event);
4418 * Determine whether the lock can be canceled before replaying the lock
4419 * during recovery, see bug16774 for detailed information.
4421 * \retval zero the lock can't be canceled
4422 * \retval other ok to cancel
4424 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4426 check_res_locked(lock->l_resource);
4429 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4431 * XXX as a future improvement, we can also cancel unused write lock
4432 * if it doesn't have dirty data and active mmaps.
4434 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4435 (lock->l_granted_mode == LCK_PR ||
4436 lock->l_granted_mode == LCK_CR) &&
4437 (osc_dlm_lock_pageref(lock) == 0))
4443 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4449 rc = ptlrpcd_addref();
4453 rc = client_obd_setup(obd, lcfg);
4457 struct lprocfs_static_vars lvars = { 0 };
4458 struct client_obd *cli = &obd->u.cli;
4460 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4461 lprocfs_osc_init_vars(&lvars);
4462 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4463 lproc_osc_attach_seqstat(obd);
4464 sptlrpc_lprocfs_cliobd_attach(obd);
4465 ptlrpc_lprocfs_register_obd(obd);
4469 /* We need to allocate a few requests more, because
4470 brw_interpret tries to create new requests before freeing
4471 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4472 reserved, but I afraid that might be too much wasted RAM
4473 in fact, so 2 is just my guess and still should work. */
4474 cli->cl_import->imp_rq_pool =
4475 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4477 ptlrpc_add_rqs_to_pool);
4479 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4480 cfs_sema_init(&cli->cl_grant_sem, 1);
4482 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4488 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4494 case OBD_CLEANUP_EARLY: {
4495 struct obd_import *imp;
4496 imp = obd->u.cli.cl_import;
4497 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4498 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4499 ptlrpc_deactivate_import(imp);
4500 cfs_spin_lock(&imp->imp_lock);
4501 imp->imp_pingable = 0;
4502 cfs_spin_unlock(&imp->imp_lock);
4505 case OBD_CLEANUP_EXPORTS: {
4506 /* If we set up but never connected, the
4507 client import will not have been cleaned. */
4508 if (obd->u.cli.cl_import) {
4509 struct obd_import *imp;
4510 cfs_down_write(&obd->u.cli.cl_sem);
4511 imp = obd->u.cli.cl_import;
4512 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4514 ptlrpc_invalidate_import(imp);
4515 if (imp->imp_rq_pool) {
4516 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4517 imp->imp_rq_pool = NULL;
4519 class_destroy_import(imp);
4520 cfs_up_write(&obd->u.cli.cl_sem);
4521 obd->u.cli.cl_import = NULL;
4523 rc = obd_llog_finish(obd, 0);
4525 CERROR("failed to cleanup llogging subsystems\n");
4532 int osc_cleanup(struct obd_device *obd)
4537 ptlrpc_lprocfs_unregister_obd(obd);
4538 lprocfs_obd_cleanup(obd);
4540 /* free memory of osc quota cache */
4541 lquota_cleanup(quota_interface, obd);
4543 rc = client_obd_cleanup(obd);
4549 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4551 struct lprocfs_static_vars lvars = { 0 };
4554 lprocfs_osc_init_vars(&lvars);
4556 switch (lcfg->lcfg_command) {
4558 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4568 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4570 return osc_process_config_base(obd, buf);
4573 struct obd_ops osc_obd_ops = {
4574 .o_owner = THIS_MODULE,
4575 .o_setup = osc_setup,
4576 .o_precleanup = osc_precleanup,
4577 .o_cleanup = osc_cleanup,
4578 .o_add_conn = client_import_add_conn,
4579 .o_del_conn = client_import_del_conn,
4580 .o_connect = client_connect_import,
4581 .o_reconnect = osc_reconnect,
4582 .o_disconnect = osc_disconnect,
4583 .o_statfs = osc_statfs,
4584 .o_statfs_async = osc_statfs_async,
4585 .o_packmd = osc_packmd,
4586 .o_unpackmd = osc_unpackmd,
4587 .o_precreate = osc_precreate,
4588 .o_create = osc_create,
4589 .o_create_async = osc_create_async,
4590 .o_destroy = osc_destroy,
4591 .o_getattr = osc_getattr,
4592 .o_getattr_async = osc_getattr_async,
4593 .o_setattr = osc_setattr,
4594 .o_setattr_async = osc_setattr_async,
4596 .o_punch = osc_punch,
4598 .o_enqueue = osc_enqueue,
4599 .o_change_cbdata = osc_change_cbdata,
4600 .o_find_cbdata = osc_find_cbdata,
4601 .o_cancel = osc_cancel,
4602 .o_cancel_unused = osc_cancel_unused,
4603 .o_iocontrol = osc_iocontrol,
4604 .o_get_info = osc_get_info,
4605 .o_set_info_async = osc_set_info_async,
4606 .o_import_event = osc_import_event,
4607 .o_llog_init = osc_llog_init,
4608 .o_llog_finish = osc_llog_finish,
4609 .o_process_config = osc_process_config,
4612 extern struct lu_kmem_descr osc_caches[];
4613 extern cfs_spinlock_t osc_ast_guard;
4614 extern cfs_lock_class_key_t osc_ast_guard_class;
4616 int __init osc_init(void)
4618 struct lprocfs_static_vars lvars = { 0 };
4622 /* print an address of _any_ initialized kernel symbol from this
4623 * module, to allow debugging with gdb that doesn't support data
4624 * symbols from modules.*/
4625 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4627 rc = lu_kmem_init(osc_caches);
4629 lprocfs_osc_init_vars(&lvars);
4631 cfs_request_module("lquota");
4632 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4633 lquota_init(quota_interface);
4634 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4636 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4637 LUSTRE_OSC_NAME, &osc_device_type);
4639 if (quota_interface)
4640 PORTAL_SYMBOL_PUT(osc_quota_interface);
4641 lu_kmem_fini(osc_caches);
4645 cfs_spin_lock_init(&osc_ast_guard);
4646 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4648 osc_mds_ost_orig_logops = llog_lvfs_ops;
4649 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4650 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4651 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4652 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4658 static void /*__exit*/ osc_exit(void)
4660 lu_device_type_fini(&osc_device_type);
4662 lquota_exit(quota_interface);
4663 if (quota_interface)
4664 PORTAL_SYMBOL_PUT(osc_quota_interface);
4666 class_unregister_type(LUSTRE_OSC_NAME);
4667 lu_kmem_fini(osc_caches);
4670 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4671 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4672 MODULE_LICENSE("GPL");
4674 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);