1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 struct ptlrpc_request *req;
266 struct ost_body *body;
270 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277 ptlrpc_request_free(req);
281 osc_pack_req_body(req, oinfo);
283 ptlrpc_request_set_replen(req);
285 rc = ptlrpc_queue_wait(req);
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296 /* This should really be sent by the OST */
297 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302 ptlrpc_req_finished(req);
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307 struct obd_trans_info *oti)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
314 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323 ptlrpc_request_free(req);
327 osc_pack_req_body(req, oinfo);
329 ptlrpc_request_set_replen(req);
331 rc = ptlrpc_queue_wait(req);
335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337 GOTO(out, rc = -EPROTO);
339 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 rc = sa->sa_upcall(sa->sa_cookie, rc);
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368 struct obd_trans_info *oti,
369 obd_enqueue_update_f upcall, void *cookie,
370 struct ptlrpc_request_set *rqset)
372 struct ptlrpc_request *req;
373 struct osc_setattr_args *sa;
377 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 ptlrpc_request_free(req);
388 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 /* do mds to ost setattr asynchronously */
397 /* Do not wait for response. */
398 ptlrpcd_add_req(req, PSCOPE_OTHER);
400 req->rq_interpret_reply =
401 (ptlrpc_interpterer_t)osc_setattr_interpret;
403 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404 sa = ptlrpc_req_async_args(req);
405 sa->sa_oa = oinfo->oi_oa;
406 sa->sa_upcall = upcall;
407 sa->sa_cookie = cookie;
409 if (rqset == PTLRPCD_SET)
410 ptlrpcd_add_req(req, PSCOPE_OTHER);
412 ptlrpc_set_add_req(rqset, req);
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419 struct obd_trans_info *oti,
420 struct ptlrpc_request_set *rqset)
422 return osc_setattr_async_base(exp, oinfo, oti,
423 oinfo->oi_cb_up, oinfo, rqset);
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427 struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 struct ptlrpc_request *req;
430 struct ost_body *body;
431 struct lov_stripe_md *lsm;
440 rc = obd_alloc_memmd(exp, &lsm);
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447 GOTO(out, rc = -ENOMEM);
449 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451 ptlrpc_request_free(req);
455 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457 lustre_set_wire_obdo(&body->oa, oa);
459 ptlrpc_request_set_replen(req);
461 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462 oa->o_flags == OBD_FL_DELORPHAN) {
464 "delorphan from OST integration");
465 /* Don't resend the delorphan req */
466 req->rq_no_resend = req->rq_no_delay = 1;
469 rc = ptlrpc_queue_wait(req);
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 GOTO(out_req, rc = -EPROTO);
477 lustre_get_wire_obdo(oa, &body->oa);
479 /* This should really be sent by the OST */
480 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481 oa->o_valid |= OBD_MD_FLBLKSZ;
483 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484 * have valid lsm_oinfo data structs, so don't go touching that.
485 * This needs to be fixed in a big way.
487 lsm->lsm_object_id = oa->o_id;
488 lsm->lsm_object_seq = oa->o_seq;
492 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495 if (!oti->oti_logcookies)
496 oti_alloc_cookies(oti, 1);
497 *oti->oti_logcookies = oa->o_lcookie;
501 CDEBUG(D_HA, "transno: "LPD64"\n",
502 lustre_msg_get_transno(req->rq_repmsg));
504 ptlrpc_req_finished(req);
507 obd_free_memmd(exp, &lsm);
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512 obd_enqueue_update_f upcall, void *cookie,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_setattr_args *sa;
517 struct ost_body *body;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 ptlrpc_request_free(req);
531 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532 ptlrpc_at_set_req_timeout(req);
534 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537 osc_pack_capa(req, body, oinfo->oi_capa);
539 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PSCOPE_OTHER);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557 struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync_interpret(const struct lu_env *env,
568 struct ptlrpc_request *req,
571 struct osc_async_args *aa = arg;
572 struct ost_body *body;
578 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580 CERROR ("can't unpack ost_body\n");
581 GOTO(out, rc = -EPROTO);
584 *aa->aa_oi->oi_oa = body->oa;
586 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
590 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
591 obd_size start, obd_size end,
592 struct ptlrpc_request_set *set)
594 struct ptlrpc_request *req;
595 struct ost_body *body;
596 struct osc_async_args *aa;
601 CDEBUG(D_INFO, "oa NULL\n");
605 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
609 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
610 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
612 ptlrpc_request_free(req);
616 /* overload the size and blocks fields in the oa with start/end */
617 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
619 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
620 body->oa.o_size = start;
621 body->oa.o_blocks = end;
622 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
623 osc_pack_capa(req, body, oinfo->oi_capa);
625 ptlrpc_request_set_replen(req);
626 req->rq_interpret_reply = osc_sync_interpret;
628 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
629 aa = ptlrpc_req_async_args(req);
632 ptlrpc_set_add_req(set, req);
636 /* Find and cancel locally locks matched by @mode in the resource found by
637 * @objid. Found locks are added into @cancel list. Returns the amount of
638 * locks added to @cancels list. */
639 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
641 ldlm_mode_t mode, int lock_flags)
643 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
644 struct ldlm_res_id res_id;
645 struct ldlm_resource *res;
649 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
650 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
654 LDLM_RESOURCE_ADDREF(res);
655 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
656 lock_flags, 0, NULL);
657 LDLM_RESOURCE_DELREF(res);
658 ldlm_resource_putref(res);
662 static int osc_destroy_interpret(const struct lu_env *env,
663 struct ptlrpc_request *req, void *data,
666 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668 cfs_atomic_dec(&cli->cl_destroy_in_flight);
669 cfs_waitq_signal(&cli->cl_destroy_waitq);
673 static int osc_can_send_destroy(struct client_obd *cli)
675 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
676 cli->cl_max_rpcs_in_flight) {
677 /* The destroy request can be sent */
680 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
681 cli->cl_max_rpcs_in_flight) {
683 * The counter has been modified between the two atomic
686 cfs_waitq_signal(&cli->cl_destroy_waitq);
691 /* Destroy requests can be async always on the client, and we don't even really
692 * care about the return code since the client cannot do anything at all about
694 * When the MDS is unlinking a filename, it saves the file objects into a
695 * recovery llog, and these object records are cancelled when the OST reports
696 * they were destroyed and sync'd to disk (i.e. transaction committed).
697 * If the client dies, or the OST is down when the object should be destroyed,
698 * the records are not cancelled, and when the OST reconnects to the MDS next,
699 * it will retrieve the llog unlink logs and then sends the log cancellation
700 * cookies to the MDS after committing destroy transactions. */
701 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
702 struct lov_stripe_md *ea, struct obd_trans_info *oti,
703 struct obd_export *md_export, void *capa)
705 struct client_obd *cli = &exp->exp_obd->u.cli;
706 struct ptlrpc_request *req;
707 struct ost_body *body;
708 CFS_LIST_HEAD(cancels);
713 CDEBUG(D_INFO, "oa NULL\n");
717 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
718 LDLM_FL_DISCARD_DATA);
720 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
722 ldlm_lock_list_put(&cancels, l_bl_ast, count);
726 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
727 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
730 ptlrpc_request_free(req);
734 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
735 ptlrpc_at_set_req_timeout(req);
737 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
738 oa->o_lcookie = *oti->oti_logcookies;
739 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
741 lustre_set_wire_obdo(&body->oa, oa);
743 osc_pack_capa(req, body, (struct obd_capa *)capa);
744 ptlrpc_request_set_replen(req);
746 /* don't throttle destroy RPCs for the MDT */
747 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
748 req->rq_interpret_reply = osc_destroy_interpret;
749 if (!osc_can_send_destroy(cli)) {
750 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
754 * Wait until the number of on-going destroy RPCs drops
755 * under max_rpc_in_flight
757 l_wait_event_exclusive(cli->cl_destroy_waitq,
758 osc_can_send_destroy(cli), &lwi);
762 /* Do not wait for response */
763 ptlrpcd_add_req(req, PSCOPE_OTHER);
767 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
770 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
772 LASSERT(!(oa->o_valid & bits));
775 client_obd_list_lock(&cli->cl_loi_list_lock);
776 oa->o_dirty = cli->cl_dirty;
777 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
778 CERROR("dirty %lu - %lu > dirty_max %lu\n",
779 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
781 } else if (cfs_atomic_read(&obd_dirty_pages) -
782 cfs_atomic_read(&obd_dirty_transit_pages) >
783 obd_max_dirty_pages + 1){
784 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
785 * not covered by a lock thus they may safely race and trip
786 * this CERROR() unless we add in a small fudge factor (+1). */
787 CERROR("dirty %d - %d > system dirty_max %d\n",
788 cfs_atomic_read(&obd_dirty_pages),
789 cfs_atomic_read(&obd_dirty_transit_pages),
790 obd_max_dirty_pages);
792 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
793 CERROR("dirty %lu - dirty_max %lu too big???\n",
794 cli->cl_dirty, cli->cl_dirty_max);
797 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
798 (cli->cl_max_rpcs_in_flight + 1);
799 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
801 oa->o_grant = cli->cl_avail_grant;
802 oa->o_dropped = cli->cl_lost_grant;
803 cli->cl_lost_grant = 0;
804 client_obd_list_unlock(&cli->cl_loi_list_lock);
805 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
806 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
810 static void osc_update_next_shrink(struct client_obd *cli)
812 cli->cl_next_shrink_grant =
813 cfs_time_shift(cli->cl_grant_shrink_interval);
814 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
815 cli->cl_next_shrink_grant);
818 /* caller must hold loi_list_lock */
819 static void osc_consume_write_grant(struct client_obd *cli,
820 struct brw_page *pga)
822 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
823 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
824 cfs_atomic_inc(&obd_dirty_pages);
825 cli->cl_dirty += CFS_PAGE_SIZE;
826 cli->cl_avail_grant -= CFS_PAGE_SIZE;
827 pga->flag |= OBD_BRW_FROM_GRANT;
828 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
829 CFS_PAGE_SIZE, pga, pga->pg);
830 LASSERT(cli->cl_avail_grant >= 0);
831 osc_update_next_shrink(cli);
834 /* the companion to osc_consume_write_grant, called when a brw has completed.
835 * must be called with the loi lock held. */
836 static void osc_release_write_grant(struct client_obd *cli,
837 struct brw_page *pga, int sent)
839 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
842 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
843 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
848 pga->flag &= ~OBD_BRW_FROM_GRANT;
849 cfs_atomic_dec(&obd_dirty_pages);
850 cli->cl_dirty -= CFS_PAGE_SIZE;
851 if (pga->flag & OBD_BRW_NOCACHE) {
852 pga->flag &= ~OBD_BRW_NOCACHE;
853 cfs_atomic_dec(&obd_dirty_transit_pages);
854 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
857 cli->cl_lost_grant += CFS_PAGE_SIZE;
858 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
859 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
860 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
861 /* For short writes we shouldn't count parts of pages that
862 * span a whole block on the OST side, or our accounting goes
863 * wrong. Should match the code in filter_grant_check. */
864 int offset = pga->off & ~CFS_PAGE_MASK;
865 int count = pga->count + (offset & (blocksize - 1));
866 int end = (offset + pga->count) & (blocksize - 1);
868 count += blocksize - end;
870 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
871 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
872 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
873 cli->cl_avail_grant, cli->cl_dirty);
879 static unsigned long rpcs_in_flight(struct client_obd *cli)
881 return cli->cl_r_in_flight + cli->cl_w_in_flight;
884 /* caller must hold loi_list_lock */
885 void osc_wake_cache_waiters(struct client_obd *cli)
888 struct osc_cache_waiter *ocw;
891 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
892 /* if we can't dirty more, we must wait until some is written */
893 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
894 (cfs_atomic_read(&obd_dirty_pages) + 1 >
895 obd_max_dirty_pages)) {
896 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
897 "osc max %ld, sys max %d\n", cli->cl_dirty,
898 cli->cl_dirty_max, obd_max_dirty_pages);
902 /* if still dirty cache but no grant wait for pending RPCs that
903 * may yet return us some grant before doing sync writes */
904 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
905 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
906 cli->cl_w_in_flight);
910 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
911 cfs_list_del_init(&ocw->ocw_entry);
912 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
913 /* no more RPCs in flight to return grant, do sync IO */
914 ocw->ocw_rc = -EDQUOT;
915 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
917 osc_consume_write_grant(cli,
918 &ocw->ocw_oap->oap_brw_page);
921 cfs_waitq_signal(&ocw->ocw_waitq);
927 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
929 client_obd_list_lock(&cli->cl_loi_list_lock);
930 cli->cl_avail_grant += grant;
931 client_obd_list_unlock(&cli->cl_loi_list_lock);
934 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
936 if (body->oa.o_valid & OBD_MD_FLGRANT) {
937 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
938 __osc_update_grant(cli, body->oa.o_grant);
942 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
943 void *key, obd_count vallen, void *val,
944 struct ptlrpc_request_set *set);
946 static int osc_shrink_grant_interpret(const struct lu_env *env,
947 struct ptlrpc_request *req,
950 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
951 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
952 struct ost_body *body;
955 __osc_update_grant(cli, oa->o_grant);
959 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
961 osc_update_grant(cli, body);
967 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
969 client_obd_list_lock(&cli->cl_loi_list_lock);
970 oa->o_grant = cli->cl_avail_grant / 4;
971 cli->cl_avail_grant -= oa->o_grant;
972 client_obd_list_unlock(&cli->cl_loi_list_lock);
973 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
974 oa->o_valid |= OBD_MD_FLFLAGS;
977 oa->o_flags |= OBD_FL_SHRINK_GRANT;
978 osc_update_next_shrink(cli);
981 /* Shrink the current grant, either from some large amount to enough for a
982 * full set of in-flight RPCs, or if we have already shrunk to that limit
983 * then to enough for a single RPC. This avoids keeping more grant than
984 * needed, and avoids shrinking the grant piecemeal. */
985 static int osc_shrink_grant(struct client_obd *cli)
987 long target = (cli->cl_max_rpcs_in_flight + 1) *
988 cli->cl_max_pages_per_rpc;
990 client_obd_list_lock(&cli->cl_loi_list_lock);
991 if (cli->cl_avail_grant <= target)
992 target = cli->cl_max_pages_per_rpc;
993 client_obd_list_unlock(&cli->cl_loi_list_lock);
995 return osc_shrink_grant_to_target(cli, target);
998 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1001 struct ost_body *body;
1004 client_obd_list_lock(&cli->cl_loi_list_lock);
1005 /* Don't shrink if we are already above or below the desired limit
1006 * We don't want to shrink below a single RPC, as that will negatively
1007 * impact block allocation and long-term performance. */
1008 if (target < cli->cl_max_pages_per_rpc)
1009 target = cli->cl_max_pages_per_rpc;
1011 if (target >= cli->cl_avail_grant) {
1012 client_obd_list_unlock(&cli->cl_loi_list_lock);
1015 client_obd_list_unlock(&cli->cl_loi_list_lock);
1017 OBD_ALLOC_PTR(body);
1021 osc_announce_cached(cli, &body->oa, 0);
1023 client_obd_list_lock(&cli->cl_loi_list_lock);
1024 body->oa.o_grant = cli->cl_avail_grant - target;
1025 cli->cl_avail_grant = target;
1026 client_obd_list_unlock(&cli->cl_loi_list_lock);
1027 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1028 body->oa.o_valid |= OBD_MD_FLFLAGS;
1029 body->oa.o_flags = 0;
1031 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1032 osc_update_next_shrink(cli);
1034 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1035 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1036 sizeof(*body), body, NULL);
1038 __osc_update_grant(cli, body->oa.o_grant);
1043 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1044 static int osc_should_shrink_grant(struct client_obd *client)
1046 cfs_time_t time = cfs_time_current();
1047 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1049 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1050 OBD_CONNECT_GRANT_SHRINK) == 0)
1053 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1054 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1055 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1058 osc_update_next_shrink(client);
1063 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1065 struct client_obd *client;
1067 cfs_list_for_each_entry(client, &item->ti_obd_list,
1068 cl_grant_shrink_list) {
1069 if (osc_should_shrink_grant(client))
1070 osc_shrink_grant(client);
1075 static int osc_add_shrink_grant(struct client_obd *client)
1079 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1081 osc_grant_shrink_grant_cb, NULL,
1082 &client->cl_grant_shrink_list);
1084 CERROR("add grant client %s error %d\n",
1085 client->cl_import->imp_obd->obd_name, rc);
1088 CDEBUG(D_CACHE, "add grant client %s \n",
1089 client->cl_import->imp_obd->obd_name);
1090 osc_update_next_shrink(client);
1094 static int osc_del_shrink_grant(struct client_obd *client)
1096 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1100 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1103 * ocd_grant is the total grant amount we're expect to hold: if we've
1104 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1105 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1107 * race is tolerable here: if we're evicted, but imp_state already
1108 * left EVICTED state, then cl_dirty must be 0 already.
1110 client_obd_list_lock(&cli->cl_loi_list_lock);
1111 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1112 cli->cl_avail_grant = ocd->ocd_grant;
1114 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1116 if (cli->cl_avail_grant < 0) {
1117 CWARN("%s: available grant < 0, the OSS is probably not running"
1118 " with patch from bug20278 (%ld) \n",
1119 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1120 /* workaround for 1.6 servers which do not have
1121 * the patch from bug20278 */
1122 cli->cl_avail_grant = ocd->ocd_grant;
1125 client_obd_list_unlock(&cli->cl_loi_list_lock);
1127 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1128 cli->cl_import->imp_obd->obd_name,
1129 cli->cl_avail_grant, cli->cl_lost_grant);
1131 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1132 cfs_list_empty(&cli->cl_grant_shrink_list))
1133 osc_add_shrink_grant(cli);
1136 /* We assume that the reason this OSC got a short read is because it read
1137 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1138 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1139 * this stripe never got written at or beyond this stripe offset yet. */
1140 static void handle_short_read(int nob_read, obd_count page_count,
1141 struct brw_page **pga)
1146 /* skip bytes read OK */
1147 while (nob_read > 0) {
1148 LASSERT (page_count > 0);
1150 if (pga[i]->count > nob_read) {
1151 /* EOF inside this page */
1152 ptr = cfs_kmap(pga[i]->pg) +
1153 (pga[i]->off & ~CFS_PAGE_MASK);
1154 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1155 cfs_kunmap(pga[i]->pg);
1161 nob_read -= pga[i]->count;
1166 /* zero remaining pages */
1167 while (page_count-- > 0) {
1168 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1169 memset(ptr, 0, pga[i]->count);
1170 cfs_kunmap(pga[i]->pg);
1175 static int check_write_rcs(struct ptlrpc_request *req,
1176 int requested_nob, int niocount,
1177 obd_count page_count, struct brw_page **pga)
1182 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1183 sizeof(*remote_rcs) *
1185 if (remote_rcs == NULL) {
1186 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1190 /* return error if any niobuf was in error */
1191 for (i = 0; i < niocount; i++) {
1192 if (remote_rcs[i] < 0)
1193 return(remote_rcs[i]);
1195 if (remote_rcs[i] != 0) {
1196 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1197 i, remote_rcs[i], req);
1202 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1203 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1204 req->rq_bulk->bd_nob_transferred, requested_nob);
1211 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1213 if (p1->flag != p2->flag) {
1214 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1215 OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1217 /* warn if we try to combine flags that we don't know to be
1218 * safe to combine */
1219 if ((p1->flag & mask) != (p2->flag & mask))
1220 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1221 "same brw?\n", p1->flag, p2->flag);
1225 return (p1->off + p1->count == p2->off);
1228 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1229 struct brw_page **pga, int opc,
1230 cksum_type_t cksum_type)
1235 LASSERT (pg_count > 0);
1236 cksum = init_checksum(cksum_type);
1237 while (nob > 0 && pg_count > 0) {
1238 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1239 int off = pga[i]->off & ~CFS_PAGE_MASK;
1240 int count = pga[i]->count > nob ? nob : pga[i]->count;
1242 /* corrupt the data before we compute the checksum, to
1243 * simulate an OST->client data error */
1244 if (i == 0 && opc == OST_READ &&
1245 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1246 memcpy(ptr + off, "bad1", min(4, nob));
1247 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1248 cfs_kunmap(pga[i]->pg);
1249 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1252 nob -= pga[i]->count;
1256 /* For sending we only compute the wrong checksum instead
1257 * of corrupting the data so it is still correct on a redo */
1258 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1264 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1265 struct lov_stripe_md *lsm, obd_count page_count,
1266 struct brw_page **pga,
1267 struct ptlrpc_request **reqp,
1268 struct obd_capa *ocapa, int reserve,
1271 struct ptlrpc_request *req;
1272 struct ptlrpc_bulk_desc *desc;
1273 struct ost_body *body;
1274 struct obd_ioobj *ioobj;
1275 struct niobuf_remote *niobuf;
1276 int niocount, i, requested_nob, opc, rc;
1277 struct osc_brw_async_args *aa;
1278 struct req_capsule *pill;
1279 struct brw_page *pg_prev;
1282 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1283 RETURN(-ENOMEM); /* Recoverable */
1284 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1285 RETURN(-EINVAL); /* Fatal */
1287 if ((cmd & OBD_BRW_WRITE) != 0) {
1289 req = ptlrpc_request_alloc_pool(cli->cl_import,
1290 cli->cl_import->imp_rq_pool,
1291 &RQF_OST_BRW_WRITE);
1294 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1299 for (niocount = i = 1; i < page_count; i++) {
1300 if (!can_merge_pages(pga[i - 1], pga[i]))
1304 pill = &req->rq_pill;
1305 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1307 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1308 niocount * sizeof(*niobuf));
1309 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1311 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1313 ptlrpc_request_free(req);
1316 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1317 ptlrpc_at_set_req_timeout(req);
1319 if (opc == OST_WRITE)
1320 desc = ptlrpc_prep_bulk_imp(req, page_count,
1321 BULK_GET_SOURCE, OST_BULK_PORTAL);
1323 desc = ptlrpc_prep_bulk_imp(req, page_count,
1324 BULK_PUT_SINK, OST_BULK_PORTAL);
1327 GOTO(out, rc = -ENOMEM);
1328 /* NB request now owns desc and will free it when it gets freed */
1330 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1331 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1332 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1333 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1335 lustre_set_wire_obdo(&body->oa, oa);
1337 obdo_to_ioobj(oa, ioobj);
1338 ioobj->ioo_bufcnt = niocount;
1339 osc_pack_capa(req, body, ocapa);
1340 LASSERT (page_count > 0);
1342 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1343 struct brw_page *pg = pga[i];
1345 LASSERT(pg->count > 0);
1346 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1347 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1348 pg->off, pg->count);
1350 LASSERTF(i == 0 || pg->off > pg_prev->off,
1351 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1352 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1354 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1355 pg_prev->pg, page_private(pg_prev->pg),
1356 pg_prev->pg->index, pg_prev->off);
1358 LASSERTF(i == 0 || pg->off > pg_prev->off,
1359 "i %d p_c %u\n", i, page_count);
1361 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1362 (pg->flag & OBD_BRW_SRVLOCK));
1364 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1366 requested_nob += pg->count;
1368 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1370 niobuf->len += pg->count;
1372 niobuf->offset = pg->off;
1373 niobuf->len = pg->count;
1374 niobuf->flags = pg->flag;
1379 LASSERTF((void *)(niobuf - niocount) ==
1380 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1381 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1382 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1384 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1386 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1387 body->oa.o_valid |= OBD_MD_FLFLAGS;
1388 body->oa.o_flags = 0;
1390 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1393 if (osc_should_shrink_grant(cli))
1394 osc_shrink_grant_local(cli, &body->oa);
1396 /* size[REQ_REC_OFF] still sizeof (*body) */
1397 if (opc == OST_WRITE) {
1398 if (unlikely(cli->cl_checksum) &&
1399 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1400 /* store cl_cksum_type in a local variable since
1401 * it can be changed via lprocfs */
1402 cksum_type_t cksum_type = cli->cl_cksum_type;
1404 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1405 oa->o_flags &= OBD_FL_LOCAL_MASK;
1406 body->oa.o_flags = 0;
1408 body->oa.o_flags |= cksum_type_pack(cksum_type);
1409 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1410 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1414 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1416 /* save this in 'oa', too, for later checking */
1417 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1418 oa->o_flags |= cksum_type_pack(cksum_type);
1420 /* clear out the checksum flag, in case this is a
1421 * resend but cl_checksum is no longer set. b=11238 */
1422 oa->o_valid &= ~OBD_MD_FLCKSUM;
1424 oa->o_cksum = body->oa.o_cksum;
1425 /* 1 RC per niobuf */
1426 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1427 sizeof(__u32) * niocount);
1429 if (unlikely(cli->cl_checksum) &&
1430 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1431 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1432 body->oa.o_flags = 0;
1433 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1434 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1437 ptlrpc_request_set_replen(req);
1439 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1440 aa = ptlrpc_req_async_args(req);
1442 aa->aa_requested_nob = requested_nob;
1443 aa->aa_nio_count = niocount;
1444 aa->aa_page_count = page_count;
1448 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1449 if (ocapa && reserve)
1450 aa->aa_ocapa = capa_get(ocapa);
1456 ptlrpc_req_finished(req);
1460 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1461 __u32 client_cksum, __u32 server_cksum, int nob,
1462 obd_count page_count, struct brw_page **pga,
1463 cksum_type_t client_cksum_type)
1467 cksum_type_t cksum_type;
1469 if (server_cksum == client_cksum) {
1470 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1474 /* If this is mmaped file - it can be changed at any time */
1475 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1478 if (oa->o_valid & OBD_MD_FLFLAGS)
1479 cksum_type = cksum_type_unpack(oa->o_flags);
1481 cksum_type = OBD_CKSUM_CRC32;
1483 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1486 if (cksum_type != client_cksum_type)
1487 msg = "the server did not use the checksum type specified in "
1488 "the original request - likely a protocol problem";
1489 else if (new_cksum == server_cksum)
1490 msg = "changed on the client after we checksummed it - "
1491 "likely false positive due to mmap IO (bug 11742)";
1492 else if (new_cksum == client_cksum)
1493 msg = "changed in transit before arrival at OST";
1495 msg = "changed in transit AND doesn't match the original - "
1496 "likely false positive due to mmap IO (bug 11742)";
1498 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1499 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1500 msg, libcfs_nid2str(peer->nid),
1501 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1502 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1503 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1505 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1507 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1508 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1509 "client csum now %x\n", client_cksum, client_cksum_type,
1510 server_cksum, cksum_type, new_cksum);
1514 /* Note rc enters this function as number of bytes transferred */
1515 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1517 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1518 const lnet_process_id_t *peer =
1519 &req->rq_import->imp_connection->c_peer;
1520 struct client_obd *cli = aa->aa_cli;
1521 struct ost_body *body;
1522 __u32 client_cksum = 0;
1525 if (rc < 0 && rc != -EDQUOT) {
1526 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1530 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1531 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1533 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1537 #ifdef HAVE_QUOTA_SUPPORT
1538 /* set/clear over quota flag for a uid/gid */
1539 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1540 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1541 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1543 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1544 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1546 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1551 osc_update_grant(cli, body);
1556 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1557 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1559 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1561 CERROR("Unexpected +ve rc %d\n", rc);
1564 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1566 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1569 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1570 check_write_checksum(&body->oa, peer, client_cksum,
1571 body->oa.o_cksum, aa->aa_requested_nob,
1572 aa->aa_page_count, aa->aa_ppga,
1573 cksum_type_unpack(aa->aa_oa->o_flags)))
1576 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1577 aa->aa_page_count, aa->aa_ppga);
1581 /* The rest of this function executes only for OST_READs */
1583 /* if unwrap_bulk failed, return -EAGAIN to retry */
1584 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1586 GOTO(out, rc = -EAGAIN);
1588 if (rc > aa->aa_requested_nob) {
1589 CERROR("Unexpected rc %d (%d requested)\n", rc,
1590 aa->aa_requested_nob);
1594 if (rc != req->rq_bulk->bd_nob_transferred) {
1595 CERROR ("Unexpected rc %d (%d transferred)\n",
1596 rc, req->rq_bulk->bd_nob_transferred);
1600 if (rc < aa->aa_requested_nob)
1601 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1603 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1604 static int cksum_counter;
1605 __u32 server_cksum = body->oa.o_cksum;
1608 cksum_type_t cksum_type;
1610 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1611 cksum_type = cksum_type_unpack(body->oa.o_flags);
1613 cksum_type = OBD_CKSUM_CRC32;
1614 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1615 aa->aa_ppga, OST_READ,
1618 if (peer->nid == req->rq_bulk->bd_sender) {
1622 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1625 if (server_cksum == ~0 && rc > 0) {
1626 CERROR("Protocol error: server %s set the 'checksum' "
1627 "bit, but didn't send a checksum. Not fatal, "
1628 "but please notify on http://bugzilla.lustre.org/\n",
1629 libcfs_nid2str(peer->nid));
1630 } else if (server_cksum != client_cksum) {
1631 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1632 "%s%s%s inode "DFID" object "
1633 LPU64"/"LPU64" extent "
1634 "["LPU64"-"LPU64"]\n",
1635 req->rq_import->imp_obd->obd_name,
1636 libcfs_nid2str(peer->nid),
1638 body->oa.o_valid & OBD_MD_FLFID ?
1639 body->oa.o_parent_seq : (__u64)0,
1640 body->oa.o_valid & OBD_MD_FLFID ?
1641 body->oa.o_parent_oid : 0,
1642 body->oa.o_valid & OBD_MD_FLFID ?
1643 body->oa.o_parent_ver : 0,
1645 body->oa.o_valid & OBD_MD_FLGROUP ?
1646 body->oa.o_seq : (__u64)0,
1647 aa->aa_ppga[0]->off,
1648 aa->aa_ppga[aa->aa_page_count-1]->off +
1649 aa->aa_ppga[aa->aa_page_count-1]->count -
1651 CERROR("client %x, server %x, cksum_type %x\n",
1652 client_cksum, server_cksum, cksum_type);
1654 aa->aa_oa->o_cksum = client_cksum;
1658 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1661 } else if (unlikely(client_cksum)) {
1662 static int cksum_missed;
1665 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1666 CERROR("Checksum %u requested from %s but not sent\n",
1667 cksum_missed, libcfs_nid2str(peer->nid));
1673 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1678 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1679 struct lov_stripe_md *lsm,
1680 obd_count page_count, struct brw_page **pga,
1681 struct obd_capa *ocapa)
1683 struct ptlrpc_request *req;
1687 struct l_wait_info lwi;
1691 cfs_waitq_init(&waitq);
1694 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1695 page_count, pga, &req, ocapa, 0, resends);
1699 rc = ptlrpc_queue_wait(req);
1701 if (rc == -ETIMEDOUT && req->rq_resend) {
1702 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1703 ptlrpc_req_finished(req);
1707 rc = osc_brw_fini_request(req, rc);
1709 ptlrpc_req_finished(req);
1710 if (osc_recoverable_error(rc)) {
1712 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1713 CERROR("too many resend retries, returning error\n");
1717 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1718 l_wait_event(waitq, 0, &lwi);
1726 int osc_brw_redo_request(struct ptlrpc_request *request,
1727 struct osc_brw_async_args *aa)
1729 struct ptlrpc_request *new_req;
1730 struct ptlrpc_request_set *set = request->rq_set;
1731 struct osc_brw_async_args *new_aa;
1732 struct osc_async_page *oap;
1736 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1737 CERROR("too many resent retries, returning error\n");
1741 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1743 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1744 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1745 aa->aa_cli, aa->aa_oa,
1746 NULL /* lsm unused by osc currently */,
1747 aa->aa_page_count, aa->aa_ppga,
1748 &new_req, aa->aa_ocapa, 0, 1);
1752 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1754 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1755 if (oap->oap_request != NULL) {
1756 LASSERTF(request == oap->oap_request,
1757 "request %p != oap_request %p\n",
1758 request, oap->oap_request);
1759 if (oap->oap_interrupted) {
1760 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1761 ptlrpc_req_finished(new_req);
1766 /* New request takes over pga and oaps from old request.
1767 * Note that copying a list_head doesn't work, need to move it... */
1769 new_req->rq_interpret_reply = request->rq_interpret_reply;
1770 new_req->rq_async_args = request->rq_async_args;
1771 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1773 new_aa = ptlrpc_req_async_args(new_req);
1775 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1776 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1777 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1779 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1780 if (oap->oap_request) {
1781 ptlrpc_req_finished(oap->oap_request);
1782 oap->oap_request = ptlrpc_request_addref(new_req);
1786 new_aa->aa_ocapa = aa->aa_ocapa;
1787 aa->aa_ocapa = NULL;
1789 /* use ptlrpc_set_add_req is safe because interpret functions work
1790 * in check_set context. only one way exist with access to request
1791 * from different thread got -EINTR - this way protected with
1792 * cl_loi_list_lock */
1793 ptlrpc_set_add_req(set, new_req);
1795 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1797 DEBUG_REQ(D_INFO, new_req, "new request");
1802 * ugh, we want disk allocation on the target to happen in offset order. we'll
1803 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1804 * fine for our small page arrays and doesn't require allocation. its an
1805 * insertion sort that swaps elements that are strides apart, shrinking the
1806 * stride down until its '1' and the array is sorted.
1808 static void sort_brw_pages(struct brw_page **array, int num)
1811 struct brw_page *tmp;
1815 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1820 for (i = stride ; i < num ; i++) {
1823 while (j >= stride && array[j - stride]->off > tmp->off) {
1824 array[j] = array[j - stride];
1829 } while (stride > 1);
1832 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1838 LASSERT (pages > 0);
1839 offset = pg[i]->off & ~CFS_PAGE_MASK;
1843 if (pages == 0) /* that's all */
1846 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1847 return count; /* doesn't end on page boundary */
1850 offset = pg[i]->off & ~CFS_PAGE_MASK;
1851 if (offset != 0) /* doesn't start on page boundary */
1858 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1860 struct brw_page **ppga;
1863 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1867 for (i = 0; i < count; i++)
1872 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1874 LASSERT(ppga != NULL);
1875 OBD_FREE(ppga, sizeof(*ppga) * count);
1878 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1879 obd_count page_count, struct brw_page *pga,
1880 struct obd_trans_info *oti)
1882 struct obdo *saved_oa = NULL;
1883 struct brw_page **ppga, **orig;
1884 struct obd_import *imp = class_exp2cliimp(exp);
1885 struct client_obd *cli;
1886 int rc, page_count_orig;
1889 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1890 cli = &imp->imp_obd->u.cli;
1892 if (cmd & OBD_BRW_CHECK) {
1893 /* The caller just wants to know if there's a chance that this
1894 * I/O can succeed */
1896 if (imp->imp_invalid)
1901 /* test_brw with a failed create can trip this, maybe others. */
1902 LASSERT(cli->cl_max_pages_per_rpc);
1906 orig = ppga = osc_build_ppga(pga, page_count);
1909 page_count_orig = page_count;
1911 sort_brw_pages(ppga, page_count);
1912 while (page_count) {
1913 obd_count pages_per_brw;
1915 if (page_count > cli->cl_max_pages_per_rpc)
1916 pages_per_brw = cli->cl_max_pages_per_rpc;
1918 pages_per_brw = page_count;
1920 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1922 if (saved_oa != NULL) {
1923 /* restore previously saved oa */
1924 *oinfo->oi_oa = *saved_oa;
1925 } else if (page_count > pages_per_brw) {
1926 /* save a copy of oa (brw will clobber it) */
1927 OBDO_ALLOC(saved_oa);
1928 if (saved_oa == NULL)
1929 GOTO(out, rc = -ENOMEM);
1930 *saved_oa = *oinfo->oi_oa;
1933 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1934 pages_per_brw, ppga, oinfo->oi_capa);
1939 page_count -= pages_per_brw;
1940 ppga += pages_per_brw;
1944 osc_release_ppga(orig, page_count_orig);
1946 if (saved_oa != NULL)
1947 OBDO_FREE(saved_oa);
1952 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1953 * the dirty accounting. Writeback completes or truncate happens before
1954 * writing starts. Must be called with the loi lock held. */
1955 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1958 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1962 /* This maintains the lists of pending pages to read/write for a given object
1963 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964 * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1971 if (lop->lop_num_pending == 0)
1974 /* if we have an invalid import we want to drain the queued pages
1975 * by forcing them through rpcs that immediately fail and complete
1976 * the pages. recovery relies on this to empty the queued pages
1977 * before canceling the locks and evicting down the llite pages */
1978 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1981 /* stream rpcs in queue order as long as as there is an urgent page
1982 * queued. this is our cheap solution for good batching in the case
1983 * where writepage marks some random page in the middle of the file
1984 * as urgent because of, say, memory pressure */
1985 if (!cfs_list_empty(&lop->lop_urgent)) {
1986 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1989 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1990 optimal = cli->cl_max_pages_per_rpc;
1991 if (cmd & OBD_BRW_WRITE) {
1992 /* trigger a write rpc stream as long as there are dirtiers
1993 * waiting for space. as they're waiting, they're not going to
1994 * create more pages to coalesce with what's waiting.. */
1995 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1999 /* +16 to avoid triggering rpcs that would want to include pages
2000 * that are being queued but which can't be made ready until
2001 * the queuer finishes with the page. this is a wart for
2002 * llite::commit_write() */
2005 if (lop->lop_num_pending >= optimal)
2011 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2013 struct osc_async_page *oap;
2016 if (cfs_list_empty(&lop->lop_urgent))
2019 oap = cfs_list_entry(lop->lop_urgent.next,
2020 struct osc_async_page, oap_urgent_item);
2022 if (oap->oap_async_flags & ASYNC_HP) {
2023 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2030 static void on_list(cfs_list_t *item, cfs_list_t *list,
2033 if (cfs_list_empty(item) && should_be_on)
2034 cfs_list_add_tail(item, list);
2035 else if (!cfs_list_empty(item) && !should_be_on)
2036 cfs_list_del_init(item);
2039 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2040 * can find pages to build into rpcs quickly */
2041 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2043 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2044 lop_makes_hprpc(&loi->loi_read_lop)) {
2046 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2047 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2049 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2050 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2051 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2052 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2055 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2056 loi->loi_write_lop.lop_num_pending);
2058 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2059 loi->loi_read_lop.lop_num_pending);
2062 static void lop_update_pending(struct client_obd *cli,
2063 struct loi_oap_pages *lop, int cmd, int delta)
2065 lop->lop_num_pending += delta;
2066 if (cmd & OBD_BRW_WRITE)
2067 cli->cl_pending_w_pages += delta;
2069 cli->cl_pending_r_pages += delta;
2073 * this is called when a sync waiter receives an interruption. Its job is to
2074 * get the caller woken as soon as possible. If its page hasn't been put in an
2075 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2076 * desiring interruption which will forcefully complete the rpc once the rpc
2079 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2081 struct loi_oap_pages *lop;
2082 struct lov_oinfo *loi;
2086 LASSERT(!oap->oap_interrupted);
2087 oap->oap_interrupted = 1;
2089 /* ok, it's been put in an rpc. only one oap gets a request reference */
2090 if (oap->oap_request != NULL) {
2091 ptlrpc_mark_interrupted(oap->oap_request);
2092 ptlrpcd_wake(oap->oap_request);
2093 ptlrpc_req_finished(oap->oap_request);
2094 oap->oap_request = NULL;
2098 * page completion may be called only if ->cpo_prep() method was
2099 * executed by osc_io_submit(), that also adds page the to pending list
2101 if (!cfs_list_empty(&oap->oap_pending_item)) {
2102 cfs_list_del_init(&oap->oap_pending_item);
2103 cfs_list_del_init(&oap->oap_urgent_item);
2106 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2107 &loi->loi_write_lop : &loi->loi_read_lop;
2108 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2109 loi_list_maint(oap->oap_cli, oap->oap_loi);
2110 rc = oap->oap_caller_ops->ap_completion(env,
2111 oap->oap_caller_data,
2112 oap->oap_cmd, NULL, -EINTR);
2118 /* this is trying to propogate async writeback errors back up to the
2119 * application. As an async write fails we record the error code for later if
2120 * the app does an fsync. As long as errors persist we force future rpcs to be
2121 * sync so that the app can get a sync error and break the cycle of queueing
2122 * pages for which writeback will fail. */
2123 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2130 ar->ar_force_sync = 1;
2131 ar->ar_min_xid = ptlrpc_sample_next_xid();
2136 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2137 ar->ar_force_sync = 0;
2140 void osc_oap_to_pending(struct osc_async_page *oap)
2142 struct loi_oap_pages *lop;
2144 if (oap->oap_cmd & OBD_BRW_WRITE)
2145 lop = &oap->oap_loi->loi_write_lop;
2147 lop = &oap->oap_loi->loi_read_lop;
2149 if (oap->oap_async_flags & ASYNC_HP)
2150 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2151 else if (oap->oap_async_flags & ASYNC_URGENT)
2152 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2153 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2154 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2157 /* this must be called holding the loi list lock to give coverage to exit_cache,
2158 * async_flag maintenance, and oap_request */
2159 static void osc_ap_completion(const struct lu_env *env,
2160 struct client_obd *cli, struct obdo *oa,
2161 struct osc_async_page *oap, int sent, int rc)
2166 if (oap->oap_request != NULL) {
2167 xid = ptlrpc_req_xid(oap->oap_request);
2168 ptlrpc_req_finished(oap->oap_request);
2169 oap->oap_request = NULL;
2172 cfs_spin_lock(&oap->oap_lock);
2173 oap->oap_async_flags = 0;
2174 cfs_spin_unlock(&oap->oap_lock);
2175 oap->oap_interrupted = 0;
2177 if (oap->oap_cmd & OBD_BRW_WRITE) {
2178 osc_process_ar(&cli->cl_ar, xid, rc);
2179 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2182 if (rc == 0 && oa != NULL) {
2183 if (oa->o_valid & OBD_MD_FLBLOCKS)
2184 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2185 if (oa->o_valid & OBD_MD_FLMTIME)
2186 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2187 if (oa->o_valid & OBD_MD_FLATIME)
2188 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2189 if (oa->o_valid & OBD_MD_FLCTIME)
2190 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2193 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2194 oap->oap_cmd, oa, rc);
2196 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2197 * I/O on the page could start, but OSC calls it under lock
2198 * and thus we can add oap back to pending safely */
2200 /* upper layer wants to leave the page on pending queue */
2201 osc_oap_to_pending(oap);
2203 osc_exit_cache(cli, oap, sent);
2207 static int brw_interpret(const struct lu_env *env,
2208 struct ptlrpc_request *req, void *data, int rc)
2210 struct osc_brw_async_args *aa = data;
2211 struct client_obd *cli;
2215 rc = osc_brw_fini_request(req, rc);
2216 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2217 if (osc_recoverable_error(rc)) {
2218 /* Only retry once for mmaped files since the mmaped page
2219 * might be modified at anytime. We have to retry at least
2220 * once in case there WAS really a corruption of the page
2221 * on the network, that was not caused by mmap() modifying
2222 * the page. Bug11742 */
2223 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2224 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2225 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2228 rc = osc_brw_redo_request(req, aa);
2235 capa_put(aa->aa_ocapa);
2236 aa->aa_ocapa = NULL;
2241 client_obd_list_lock(&cli->cl_loi_list_lock);
2243 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2244 * is called so we know whether to go to sync BRWs or wait for more
2245 * RPCs to complete */
2246 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2247 cli->cl_w_in_flight--;
2249 cli->cl_r_in_flight--;
2251 async = cfs_list_empty(&aa->aa_oaps);
2252 if (!async) { /* from osc_send_oap_rpc() */
2253 struct osc_async_page *oap, *tmp;
2254 /* the caller may re-use the oap after the completion call so
2255 * we need to clean it up a little */
2256 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2258 cfs_list_del_init(&oap->oap_rpc_item);
2259 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2261 OBDO_FREE(aa->aa_oa);
2262 } else { /* from async_internal() */
2264 for (i = 0; i < aa->aa_page_count; i++)
2265 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2267 osc_wake_cache_waiters(cli);
2268 osc_check_rpcs(env, cli);
2269 client_obd_list_unlock(&cli->cl_loi_list_lock);
2271 cl_req_completion(env, aa->aa_clerq, rc);
2272 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2277 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2278 struct client_obd *cli,
2279 cfs_list_t *rpc_list,
2280 int page_count, int cmd)
2282 struct ptlrpc_request *req;
2283 struct brw_page **pga = NULL;
2284 struct osc_brw_async_args *aa;
2285 struct obdo *oa = NULL;
2286 const struct obd_async_page_ops *ops = NULL;
2287 void *caller_data = NULL;
2288 struct osc_async_page *oap;
2289 struct osc_async_page *tmp;
2290 struct ost_body *body;
2291 struct cl_req *clerq = NULL;
2292 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2293 struct ldlm_lock *lock = NULL;
2294 struct cl_req_attr crattr;
2295 int i, rc, mpflag = 0;
2298 LASSERT(!cfs_list_empty(rpc_list));
2300 if (cmd & OBD_BRW_MEMALLOC)
2301 mpflag = cfs_memory_pressure_get_and_set();
2303 memset(&crattr, 0, sizeof crattr);
2304 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2306 GOTO(out, req = ERR_PTR(-ENOMEM));
2310 GOTO(out, req = ERR_PTR(-ENOMEM));
2313 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2314 struct cl_page *page = osc_oap2cl_page(oap);
2316 ops = oap->oap_caller_ops;
2317 caller_data = oap->oap_caller_data;
2319 clerq = cl_req_alloc(env, page, crt,
2320 1 /* only 1-object rpcs for
2323 GOTO(out, req = (void *)clerq);
2324 lock = oap->oap_ldlm_lock;
2326 pga[i] = &oap->oap_brw_page;
2327 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2328 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2329 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2331 cl_req_page_add(env, clerq, page);
2334 /* always get the data for the obdo for the rpc */
2335 LASSERT(ops != NULL);
2337 crattr.cra_capa = NULL;
2338 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2340 oa->o_handle = lock->l_remote_handle;
2341 oa->o_valid |= OBD_MD_FLHANDLE;
2344 rc = cl_req_prep(env, clerq);
2346 CERROR("cl_req_prep failed: %d\n", rc);
2347 GOTO(out, req = ERR_PTR(rc));
2350 sort_brw_pages(pga, page_count);
2351 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2352 pga, &req, crattr.cra_capa, 1, 0);
2354 CERROR("prep_req failed: %d\n", rc);
2355 GOTO(out, req = ERR_PTR(rc));
2358 if (cmd & OBD_BRW_MEMALLOC)
2359 req->rq_memalloc = 1;
2361 /* Need to update the timestamps after the request is built in case
2362 * we race with setattr (locally or in queue at OST). If OST gets
2363 * later setattr before earlier BRW (as determined by the request xid),
2364 * the OST will not use BRW timestamps. Sadly, there is no obvious
2365 * way to do this in a single call. bug 10150 */
2366 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2367 cl_req_attr_set(env, clerq, &crattr,
2368 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2370 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2371 aa = ptlrpc_req_async_args(req);
2372 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2373 cfs_list_splice(rpc_list, &aa->aa_oaps);
2374 CFS_INIT_LIST_HEAD(rpc_list);
2375 aa->aa_clerq = clerq;
2377 if (cmd & OBD_BRW_MEMALLOC)
2378 cfs_memory_pressure_restore(mpflag);
2380 capa_put(crattr.cra_capa);
2385 OBD_FREE(pga, sizeof(*pga) * page_count);
2386 /* this should happen rarely and is pretty bad, it makes the
2387 * pending list not follow the dirty order */
2388 client_obd_list_lock(&cli->cl_loi_list_lock);
2389 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2390 cfs_list_del_init(&oap->oap_rpc_item);
2392 /* queued sync pages can be torn down while the pages
2393 * were between the pending list and the rpc */
2394 if (oap->oap_interrupted) {
2395 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2396 osc_ap_completion(env, cli, NULL, oap, 0,
2400 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2402 if (clerq && !IS_ERR(clerq))
2403 cl_req_completion(env, clerq, PTR_ERR(req));
2409 * prepare pages for ASYNC io and put pages in send queue.
2411 * \param cmd OBD_BRW_* macroses
2412 * \param lop pending pages
2414 * \return zero if no page added to send queue.
2415 * \return 1 if pages successfully added to send queue.
2416 * \return negative on errors.
2419 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2420 struct lov_oinfo *loi,
2421 int cmd, struct loi_oap_pages *lop)
2423 struct ptlrpc_request *req;
2424 obd_count page_count = 0;
2425 struct osc_async_page *oap = NULL, *tmp;
2426 struct osc_brw_async_args *aa;
2427 const struct obd_async_page_ops *ops;
2428 CFS_LIST_HEAD(rpc_list);
2429 CFS_LIST_HEAD(tmp_list);
2430 unsigned int ending_offset;
2431 unsigned starting_offset = 0;
2432 int srvlock = 0, mem_tight = 0;
2433 struct cl_object *clob = NULL;
2436 /* ASYNC_HP pages first. At present, when the lock the pages is
2437 * to be canceled, the pages covered by the lock will be sent out
2438 * with ASYNC_HP. We have to send out them as soon as possible. */
2439 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2440 if (oap->oap_async_flags & ASYNC_HP)
2441 cfs_list_move(&oap->oap_pending_item, &tmp_list);
2443 cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2444 if (++page_count >= cli->cl_max_pages_per_rpc)
2448 cfs_list_splice(&tmp_list, &lop->lop_pending);
2451 /* first we find the pages we're allowed to work with */
2452 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2454 ops = oap->oap_caller_ops;
2456 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2457 "magic 0x%x\n", oap, oap->oap_magic);
2460 /* pin object in memory, so that completion call-backs
2461 * can be safely called under client_obd_list lock. */
2462 clob = osc_oap2cl_page(oap)->cp_obj;
2463 cl_object_get(clob);
2466 if (page_count != 0 &&
2467 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2468 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2469 " oap %p, page %p, srvlock %u\n",
2470 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2474 /* If there is a gap at the start of this page, it can't merge
2475 * with any previous page, so we'll hand the network a
2476 * "fragmented" page array that it can't transfer in 1 RDMA */
2477 if (page_count != 0 && oap->oap_page_off != 0)
2480 /* in llite being 'ready' equates to the page being locked
2481 * until completion unlocks it. commit_write submits a page
2482 * as not ready because its unlock will happen unconditionally
2483 * as the call returns. if we race with commit_write giving
2484 * us that page we don't want to create a hole in the page
2485 * stream, so we stop and leave the rpc to be fired by
2486 * another dirtier or kupdated interval (the not ready page
2487 * will still be on the dirty list). we could call in
2488 * at the end of ll_file_write to process the queue again. */
2489 if (!(oap->oap_async_flags & ASYNC_READY)) {
2490 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2493 CDEBUG(D_INODE, "oap %p page %p returned %d "
2494 "instead of ready\n", oap,
2498 /* llite is telling us that the page is still
2499 * in commit_write and that we should try
2500 * and put it in an rpc again later. we
2501 * break out of the loop so we don't create
2502 * a hole in the sequence of pages in the rpc
2507 /* the io isn't needed.. tell the checks
2508 * below to complete the rpc with EINTR */
2509 cfs_spin_lock(&oap->oap_lock);
2510 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2511 cfs_spin_unlock(&oap->oap_lock);
2512 oap->oap_count = -EINTR;
2515 cfs_spin_lock(&oap->oap_lock);
2516 oap->oap_async_flags |= ASYNC_READY;
2517 cfs_spin_unlock(&oap->oap_lock);
2520 LASSERTF(0, "oap %p page %p returned %d "
2521 "from make_ready\n", oap,
2529 * Page submitted for IO has to be locked. Either by
2530 * ->ap_make_ready() or by higher layers.
2532 #if defined(__KERNEL__) && defined(__linux__)
2534 struct cl_page *page;
2536 page = osc_oap2cl_page(oap);
2538 if (page->cp_type == CPT_CACHEABLE &&
2539 !(PageLocked(oap->oap_page) &&
2540 (CheckWriteback(oap->oap_page, cmd)))) {
2541 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2543 (long)oap->oap_page->flags,
2544 oap->oap_async_flags);
2550 /* take the page out of our book-keeping */
2551 cfs_list_del_init(&oap->oap_pending_item);
2552 lop_update_pending(cli, lop, cmd, -1);
2553 cfs_list_del_init(&oap->oap_urgent_item);
2555 if (page_count == 0)
2556 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2557 (PTLRPC_MAX_BRW_SIZE - 1);
2559 /* ask the caller for the size of the io as the rpc leaves. */
2560 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2562 ops->ap_refresh_count(env, oap->oap_caller_data,
2564 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2566 if (oap->oap_count <= 0) {
2567 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2569 osc_ap_completion(env, cli, NULL,
2570 oap, 0, oap->oap_count);
2574 /* now put the page back in our accounting */
2575 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2576 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2578 if (page_count == 0)
2579 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2580 if (++page_count >= cli->cl_max_pages_per_rpc)
2583 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2584 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2585 * have the same alignment as the initial writes that allocated
2586 * extents on the server. */
2587 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2588 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2589 if (ending_offset == 0)
2592 /* If there is a gap at the end of this page, it can't merge
2593 * with any subsequent pages, so we'll hand the network a
2594 * "fragmented" page array that it can't transfer in 1 RDMA */
2595 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2599 osc_wake_cache_waiters(cli);
2601 loi_list_maint(cli, loi);
2603 client_obd_list_unlock(&cli->cl_loi_list_lock);
2606 cl_object_put(env, clob);
2608 if (page_count == 0) {
2609 client_obd_list_lock(&cli->cl_loi_list_lock);
2613 req = osc_build_req(env, cli, &rpc_list, page_count,
2614 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2616 LASSERT(cfs_list_empty(&rpc_list));
2617 loi_list_maint(cli, loi);
2618 RETURN(PTR_ERR(req));
2621 aa = ptlrpc_req_async_args(req);
2623 if (cmd == OBD_BRW_READ) {
2624 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2625 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2626 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2627 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2629 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2630 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2631 cli->cl_w_in_flight);
2632 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2633 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2635 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2637 client_obd_list_lock(&cli->cl_loi_list_lock);
2639 if (cmd == OBD_BRW_READ)
2640 cli->cl_r_in_flight++;
2642 cli->cl_w_in_flight++;
2644 /* queued sync pages can be torn down while the pages
2645 * were between the pending list and the rpc */
2647 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2648 /* only one oap gets a request reference */
2651 if (oap->oap_interrupted && !req->rq_intr) {
2652 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2654 ptlrpc_mark_interrupted(req);
2658 tmp->oap_request = ptlrpc_request_addref(req);
2660 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2661 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2663 req->rq_interpret_reply = brw_interpret;
2664 ptlrpcd_add_req(req, PSCOPE_BRW);
2668 #define LOI_DEBUG(LOI, STR, args...) \
2669 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2670 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2671 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2672 (LOI)->loi_write_lop.lop_num_pending, \
2673 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2674 (LOI)->loi_read_lop.lop_num_pending, \
2675 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2678 /* This is called by osc_check_rpcs() to find which objects have pages that
2679 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2680 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2684 /* First return objects that have blocked locks so that they
2685 * will be flushed quickly and other clients can get the lock,
2686 * then objects which have pages ready to be stuffed into RPCs */
2687 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2688 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2689 struct lov_oinfo, loi_hp_ready_item));
2690 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2691 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2692 struct lov_oinfo, loi_ready_item));
2694 /* then if we have cache waiters, return all objects with queued
2695 * writes. This is especially important when many small files
2696 * have filled up the cache and not been fired into rpcs because
2697 * they don't pass the nr_pending/object threshhold */
2698 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2699 !cfs_list_empty(&cli->cl_loi_write_list))
2700 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2701 struct lov_oinfo, loi_write_item));
2703 /* then return all queued objects when we have an invalid import
2704 * so that they get flushed */
2705 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2706 if (!cfs_list_empty(&cli->cl_loi_write_list))
2707 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2710 if (!cfs_list_empty(&cli->cl_loi_read_list))
2711 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2712 struct lov_oinfo, loi_read_item));
2717 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2719 struct osc_async_page *oap;
2722 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2723 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2724 struct osc_async_page, oap_urgent_item);
2725 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2728 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2729 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2730 struct osc_async_page, oap_urgent_item);
2731 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2734 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2737 /* called with the loi list lock held */
2738 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2740 struct lov_oinfo *loi;
2741 int rc = 0, race_counter = 0;
2744 while ((loi = osc_next_loi(cli)) != NULL) {
2745 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2747 if (osc_max_rpc_in_flight(cli, loi))
2750 /* attempt some read/write balancing by alternating between
2751 * reads and writes in an object. The makes_rpc checks here
2752 * would be redundant if we were getting read/write work items
2753 * instead of objects. we don't want send_oap_rpc to drain a
2754 * partial read pending queue when we're given this object to
2755 * do io on writes while there are cache waiters */
2756 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2757 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2758 &loi->loi_write_lop);
2760 CERROR("Write request failed with %d\n", rc);
2762 /* osc_send_oap_rpc failed, mostly because of
2765 * It can't break here, because if:
2766 * - a page was submitted by osc_io_submit, so
2768 * - no request in flight
2769 * - no subsequent request
2770 * The system will be in live-lock state,
2771 * because there is no chance to call
2772 * osc_io_unplug() and osc_check_rpcs() any
2773 * more. pdflush can't help in this case,
2774 * because it might be blocked at grabbing
2775 * the page lock as we mentioned.
2777 * Anyway, continue to drain pages. */
2786 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2787 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2788 &loi->loi_read_lop);
2790 CERROR("Read request failed with %d\n", rc);
2798 /* attempt some inter-object balancing by issuing rpcs
2799 * for each object in turn */
2800 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2801 cfs_list_del_init(&loi->loi_hp_ready_item);
2802 if (!cfs_list_empty(&loi->loi_ready_item))
2803 cfs_list_del_init(&loi->loi_ready_item);
2804 if (!cfs_list_empty(&loi->loi_write_item))
2805 cfs_list_del_init(&loi->loi_write_item);
2806 if (!cfs_list_empty(&loi->loi_read_item))
2807 cfs_list_del_init(&loi->loi_read_item);
2809 loi_list_maint(cli, loi);
2811 /* send_oap_rpc fails with 0 when make_ready tells it to
2812 * back off. llite's make_ready does this when it tries
2813 * to lock a page queued for write that is already locked.
2814 * we want to try sending rpcs from many objects, but we
2815 * don't want to spin failing with 0. */
2816 if (race_counter == 10)
2822 /* we're trying to queue a page in the osc so we're subject to the
2823 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2824 * If the osc's queued pages are already at that limit, then we want to sleep
2825 * until there is space in the osc's queue for us. We also may be waiting for
2826 * write credits from the OST if there are RPCs in flight that may return some
2827 * before we fall back to sync writes.
2829 * We need this know our allocation was granted in the presence of signals */
2830 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2834 client_obd_list_lock(&cli->cl_loi_list_lock);
2835 rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2836 client_obd_list_unlock(&cli->cl_loi_list_lock);
2841 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2844 int osc_enter_cache_try(const struct lu_env *env,
2845 struct client_obd *cli, struct lov_oinfo *loi,
2846 struct osc_async_page *oap, int transient)
2850 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2852 osc_consume_write_grant(cli, &oap->oap_brw_page);
2854 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2855 cfs_atomic_inc(&obd_dirty_transit_pages);
2856 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2862 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2863 * grant or cache space. */
2864 static int osc_enter_cache(const struct lu_env *env,
2865 struct client_obd *cli, struct lov_oinfo *loi,
2866 struct osc_async_page *oap)
2868 struct osc_cache_waiter ocw;
2869 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2873 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2874 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2875 cli->cl_dirty_max, obd_max_dirty_pages,
2876 cli->cl_lost_grant, cli->cl_avail_grant);
2878 /* force the caller to try sync io. this can jump the list
2879 * of queued writes and create a discontiguous rpc stream */
2880 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2881 loi->loi_ar.ar_force_sync)
2884 /* Hopefully normal case - cache space and write credits available */
2885 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2886 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2887 osc_enter_cache_try(env, cli, loi, oap, 0))
2890 /* It is safe to block as a cache waiter as long as there is grant
2891 * space available or the hope of additional grant being returned
2892 * when an in flight write completes. Using the write back cache
2893 * if possible is preferable to sending the data synchronously
2894 * because write pages can then be merged in to large requests.
2895 * The addition of this cache waiter will causing pending write
2896 * pages to be sent immediately. */
2897 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2898 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2899 cfs_waitq_init(&ocw.ocw_waitq);
2903 loi_list_maint(cli, loi);
2904 osc_check_rpcs(env, cli);
2905 client_obd_list_unlock(&cli->cl_loi_list_lock);
2907 CDEBUG(D_CACHE, "sleeping for cache space\n");
2908 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2910 client_obd_list_lock(&cli->cl_loi_list_lock);
2911 if (!cfs_list_empty(&ocw.ocw_entry)) {
2912 cfs_list_del(&ocw.ocw_entry);
2922 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2923 struct lov_oinfo *loi, cfs_page_t *page,
2924 obd_off offset, const struct obd_async_page_ops *ops,
2925 void *data, void **res, int nocache,
2926 struct lustre_handle *lockh)
2928 struct osc_async_page *oap;
2933 return cfs_size_round(sizeof(*oap));
2936 oap->oap_magic = OAP_MAGIC;
2937 oap->oap_cli = &exp->exp_obd->u.cli;
2940 oap->oap_caller_ops = ops;
2941 oap->oap_caller_data = data;
2943 oap->oap_page = page;
2944 oap->oap_obj_off = offset;
2945 if (!client_is_remote(exp) &&
2946 cfs_capable(CFS_CAP_SYS_RESOURCE))
2947 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2949 LASSERT(!(offset & ~CFS_PAGE_MASK));
2951 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2952 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2953 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2954 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2956 cfs_spin_lock_init(&oap->oap_lock);
2957 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2961 struct osc_async_page *oap_from_cookie(void *cookie)
2963 struct osc_async_page *oap = cookie;
2964 if (oap->oap_magic != OAP_MAGIC)
2965 return ERR_PTR(-EINVAL);
2969 int osc_queue_async_io(const struct lu_env *env,
2970 struct obd_export *exp, struct lov_stripe_md *lsm,
2971 struct lov_oinfo *loi, void *cookie,
2972 int cmd, obd_off off, int count,
2973 obd_flag brw_flags, enum async_flags async_flags)
2975 struct client_obd *cli = &exp->exp_obd->u.cli;
2976 struct osc_async_page *oap;
2980 oap = oap_from_cookie(cookie);
2982 RETURN(PTR_ERR(oap));
2984 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2987 if (!cfs_list_empty(&oap->oap_pending_item) ||
2988 !cfs_list_empty(&oap->oap_urgent_item) ||
2989 !cfs_list_empty(&oap->oap_rpc_item))
2992 /* check if the file's owner/group is over quota */
2993 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2994 struct cl_object *obj;
2995 struct cl_attr attr; /* XXX put attr into thread info */
2996 unsigned int qid[MAXQUOTAS];
2998 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3000 cl_object_attr_lock(obj);
3001 rc = cl_object_attr_get(env, obj, &attr);
3002 cl_object_attr_unlock(obj);
3004 qid[USRQUOTA] = attr.cat_uid;
3005 qid[GRPQUOTA] = attr.cat_gid;
3007 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3014 loi = lsm->lsm_oinfo[0];
3016 client_obd_list_lock(&cli->cl_loi_list_lock);
3018 LASSERT(off + count <= CFS_PAGE_SIZE);
3020 oap->oap_page_off = off;
3021 oap->oap_count = count;
3022 oap->oap_brw_flags = brw_flags;
3023 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3024 if (cfs_memory_pressure_get())
3025 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3026 cfs_spin_lock(&oap->oap_lock);
3027 oap->oap_async_flags = async_flags;
3028 cfs_spin_unlock(&oap->oap_lock);
3030 if (cmd & OBD_BRW_WRITE) {
3031 rc = osc_enter_cache(env, cli, loi, oap);
3033 client_obd_list_unlock(&cli->cl_loi_list_lock);
3038 osc_oap_to_pending(oap);
3039 loi_list_maint(cli, loi);
3041 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3044 osc_check_rpcs(env, cli);
3045 client_obd_list_unlock(&cli->cl_loi_list_lock);
3050 /* aka (~was & now & flag), but this is more clear :) */
3051 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3053 int osc_set_async_flags_base(struct client_obd *cli,
3054 struct lov_oinfo *loi, struct osc_async_page *oap,
3055 obd_flag async_flags)
3057 struct loi_oap_pages *lop;
3061 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3063 if (oap->oap_cmd & OBD_BRW_WRITE) {
3064 lop = &loi->loi_write_lop;
3066 lop = &loi->loi_read_lop;
3069 if ((oap->oap_async_flags & async_flags) == async_flags)
3072 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3073 flags |= ASYNC_READY;
3075 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3076 cfs_list_empty(&oap->oap_rpc_item)) {
3077 if (oap->oap_async_flags & ASYNC_HP)
3078 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3080 cfs_list_add_tail(&oap->oap_urgent_item,
3082 flags |= ASYNC_URGENT;
3083 loi_list_maint(cli, loi);
3085 cfs_spin_lock(&oap->oap_lock);
3086 oap->oap_async_flags |= flags;
3087 cfs_spin_unlock(&oap->oap_lock);
3089 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3090 oap->oap_async_flags);
3094 int osc_teardown_async_page(struct obd_export *exp,
3095 struct lov_stripe_md *lsm,
3096 struct lov_oinfo *loi, void *cookie)
3098 struct client_obd *cli = &exp->exp_obd->u.cli;
3099 struct loi_oap_pages *lop;
3100 struct osc_async_page *oap;
3104 oap = oap_from_cookie(cookie);
3106 RETURN(PTR_ERR(oap));
3109 loi = lsm->lsm_oinfo[0];
3111 if (oap->oap_cmd & OBD_BRW_WRITE) {
3112 lop = &loi->loi_write_lop;
3114 lop = &loi->loi_read_lop;
3117 client_obd_list_lock(&cli->cl_loi_list_lock);
3119 if (!cfs_list_empty(&oap->oap_rpc_item))
3120 GOTO(out, rc = -EBUSY);
3122 osc_exit_cache(cli, oap, 0);
3123 osc_wake_cache_waiters(cli);
3125 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3126 cfs_list_del_init(&oap->oap_urgent_item);
3127 cfs_spin_lock(&oap->oap_lock);
3128 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3129 cfs_spin_unlock(&oap->oap_lock);
3131 if (!cfs_list_empty(&oap->oap_pending_item)) {
3132 cfs_list_del_init(&oap->oap_pending_item);
3133 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3135 loi_list_maint(cli, loi);
3136 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3138 client_obd_list_unlock(&cli->cl_loi_list_lock);
3142 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3143 struct ldlm_enqueue_info *einfo,
3146 void *data = einfo->ei_cbdata;
3148 LASSERT(lock != NULL);
3149 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3150 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3151 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3152 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3154 lock_res_and_lock(lock);
3155 cfs_spin_lock(&osc_ast_guard);
3156 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3157 lock->l_ast_data = data;
3158 cfs_spin_unlock(&osc_ast_guard);
3159 unlock_res_and_lock(lock);
3162 static void osc_set_data_with_check(struct lustre_handle *lockh,
3163 struct ldlm_enqueue_info *einfo,
3166 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3169 osc_set_lock_data_with_check(lock, einfo, flags);
3170 LDLM_LOCK_PUT(lock);
3172 CERROR("lockh %p, data %p - client evicted?\n",
3173 lockh, einfo->ei_cbdata);
3176 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3177 ldlm_iterator_t replace, void *data)
3179 struct ldlm_res_id res_id;
3180 struct obd_device *obd = class_exp2obd(exp);
3182 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3183 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3187 /* find any ldlm lock of the inode in osc
3191 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3192 ldlm_iterator_t replace, void *data)
3194 struct ldlm_res_id res_id;
3195 struct obd_device *obd = class_exp2obd(exp);
3198 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3199 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3200 if (rc == LDLM_ITER_STOP)
3202 if (rc == LDLM_ITER_CONTINUE)
3207 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3208 obd_enqueue_update_f upcall, void *cookie,
3211 int intent = *flags & LDLM_FL_HAS_INTENT;
3215 /* The request was created before ldlm_cli_enqueue call. */
3216 if (rc == ELDLM_LOCK_ABORTED) {
3217 struct ldlm_reply *rep;
3218 rep = req_capsule_server_get(&req->rq_pill,
3221 LASSERT(rep != NULL);
3222 if (rep->lock_policy_res1)
3223 rc = rep->lock_policy_res1;
3227 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3228 *flags |= LDLM_FL_LVB_READY;
3229 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3230 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3233 /* Call the update callback. */
3234 rc = (*upcall)(cookie, rc);
3238 static int osc_enqueue_interpret(const struct lu_env *env,
3239 struct ptlrpc_request *req,
3240 struct osc_enqueue_args *aa, int rc)
3242 struct ldlm_lock *lock;
3243 struct lustre_handle handle;
3246 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3247 * might be freed anytime after lock upcall has been called. */
3248 lustre_handle_copy(&handle, aa->oa_lockh);
3249 mode = aa->oa_ei->ei_mode;
3251 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3253 lock = ldlm_handle2lock(&handle);
3255 /* Take an additional reference so that a blocking AST that
3256 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3257 * to arrive after an upcall has been executed by
3258 * osc_enqueue_fini(). */
3259 ldlm_lock_addref(&handle, mode);
3261 /* Let CP AST to grant the lock first. */
3262 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3264 /* Complete obtaining the lock procedure. */
3265 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3266 mode, aa->oa_flags, aa->oa_lvb,
3267 sizeof(*aa->oa_lvb), &handle, rc);
3268 /* Complete osc stuff. */
3269 rc = osc_enqueue_fini(req, aa->oa_lvb,
3270 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3272 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3274 /* Release the lock for async request. */
3275 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3277 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3278 * not already released by
3279 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3281 ldlm_lock_decref(&handle, mode);
3283 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3284 aa->oa_lockh, req, aa);
3285 ldlm_lock_decref(&handle, mode);
3286 LDLM_LOCK_PUT(lock);
3290 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3291 struct lov_oinfo *loi, int flags,
3292 struct ost_lvb *lvb, __u32 mode, int rc)
3294 if (rc == ELDLM_OK) {
3295 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3298 LASSERT(lock != NULL);
3299 loi->loi_lvb = *lvb;
3300 tmp = loi->loi_lvb.lvb_size;
3301 /* Extend KMS up to the end of this lock and no further
3302 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3303 if (tmp > lock->l_policy_data.l_extent.end)
3304 tmp = lock->l_policy_data.l_extent.end + 1;
3305 if (tmp >= loi->loi_kms) {
3306 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3307 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3308 loi_kms_set(loi, tmp);
3310 LDLM_DEBUG(lock, "lock acquired, setting rss="
3311 LPU64"; leaving kms="LPU64", end="LPU64,
3312 loi->loi_lvb.lvb_size, loi->loi_kms,
3313 lock->l_policy_data.l_extent.end);
3315 ldlm_lock_allow_match(lock);
3316 LDLM_LOCK_PUT(lock);
3317 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3318 loi->loi_lvb = *lvb;
3319 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3320 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3324 EXPORT_SYMBOL(osc_update_enqueue);
3326 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3328 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3329 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3330 * other synchronous requests, however keeping some locks and trying to obtain
3331 * others may take a considerable amount of time in a case of ost failure; and
3332 * when other sync requests do not get released lock from a client, the client
3333 * is excluded from the cluster -- such scenarious make the life difficult, so
3334 * release locks just after they are obtained. */
3335 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3336 int *flags, ldlm_policy_data_t *policy,
3337 struct ost_lvb *lvb, int kms_valid,
3338 obd_enqueue_update_f upcall, void *cookie,
3339 struct ldlm_enqueue_info *einfo,
3340 struct lustre_handle *lockh,
3341 struct ptlrpc_request_set *rqset, int async)
3343 struct obd_device *obd = exp->exp_obd;
3344 struct ptlrpc_request *req = NULL;
3345 int intent = *flags & LDLM_FL_HAS_INTENT;
3350 /* Filesystem lock extents are extended to page boundaries so that
3351 * dealing with the page cache is a little smoother. */
3352 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3353 policy->l_extent.end |= ~CFS_PAGE_MASK;
3356 * kms is not valid when either object is completely fresh (so that no
3357 * locks are cached), or object was evicted. In the latter case cached
3358 * lock cannot be used, because it would prime inode state with
3359 * potentially stale LVB.
3364 /* Next, search for already existing extent locks that will cover us */
3365 /* If we're trying to read, we also search for an existing PW lock. The
3366 * VFS and page cache already protect us locally, so lots of readers/
3367 * writers can share a single PW lock.
3369 * There are problems with conversion deadlocks, so instead of
3370 * converting a read lock to a write lock, we'll just enqueue a new
3373 * At some point we should cancel the read lock instead of making them
3374 * send us a blocking callback, but there are problems with canceling
3375 * locks out from other users right now, too. */
3376 mode = einfo->ei_mode;
3377 if (einfo->ei_mode == LCK_PR)
3379 mode = ldlm_lock_match(obd->obd_namespace,
3380 *flags | LDLM_FL_LVB_READY, res_id,
3381 einfo->ei_type, policy, mode, lockh, 0);
3383 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3385 if (matched->l_ast_data == NULL ||
3386 matched->l_ast_data == einfo->ei_cbdata) {
3387 /* addref the lock only if not async requests and PW
3388 * lock is matched whereas we asked for PR. */
3389 if (!rqset && einfo->ei_mode != mode)
3390 ldlm_lock_addref(lockh, LCK_PR);
3391 osc_set_lock_data_with_check(matched, einfo, *flags);
3393 /* I would like to be able to ASSERT here that
3394 * rss <= kms, but I can't, for reasons which
3395 * are explained in lov_enqueue() */
3398 /* We already have a lock, and it's referenced */
3399 (*upcall)(cookie, ELDLM_OK);
3401 /* For async requests, decref the lock. */
3402 if (einfo->ei_mode != mode)
3403 ldlm_lock_decref(lockh, LCK_PW);
3405 ldlm_lock_decref(lockh, einfo->ei_mode);
3406 LDLM_LOCK_PUT(matched);
3409 ldlm_lock_decref(lockh, mode);
3410 LDLM_LOCK_PUT(matched);
3415 CFS_LIST_HEAD(cancels);
3416 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3417 &RQF_LDLM_ENQUEUE_LVB);
3421 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3423 ptlrpc_request_free(req);
3427 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3429 ptlrpc_request_set_replen(req);
3432 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3433 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3435 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3436 sizeof(*lvb), lockh, async);
3439 struct osc_enqueue_args *aa;
3440 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3441 aa = ptlrpc_req_async_args(req);
3444 aa->oa_flags = flags;
3445 aa->oa_upcall = upcall;
3446 aa->oa_cookie = cookie;
3448 aa->oa_lockh = lockh;
3450 req->rq_interpret_reply =
3451 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3452 if (rqset == PTLRPCD_SET)
3453 ptlrpcd_add_req(req, PSCOPE_OTHER);
3455 ptlrpc_set_add_req(rqset, req);
3456 } else if (intent) {
3457 ptlrpc_req_finished(req);
3462 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3464 ptlrpc_req_finished(req);
3469 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3470 struct ldlm_enqueue_info *einfo,
3471 struct ptlrpc_request_set *rqset)
3473 struct ldlm_res_id res_id;
3477 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3478 oinfo->oi_md->lsm_object_seq, &res_id);
3480 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3481 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3482 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3483 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3484 rqset, rqset != NULL);
3488 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3489 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3490 int *flags, void *data, struct lustre_handle *lockh,
3493 struct obd_device *obd = exp->exp_obd;
3494 int lflags = *flags;
3498 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3501 /* Filesystem lock extents are extended to page boundaries so that
3502 * dealing with the page cache is a little smoother */
3503 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3504 policy->l_extent.end |= ~CFS_PAGE_MASK;
3506 /* Next, search for already existing extent locks that will cover us */
3507 /* If we're trying to read, we also search for an existing PW lock. The
3508 * VFS and page cache already protect us locally, so lots of readers/
3509 * writers can share a single PW lock. */
3513 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3514 res_id, type, policy, rc, lockh, unref);
3517 osc_set_data_with_check(lockh, data, lflags);
3518 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3519 ldlm_lock_addref(lockh, LCK_PR);
3520 ldlm_lock_decref(lockh, LCK_PW);
3527 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3531 if (unlikely(mode == LCK_GROUP))
3532 ldlm_lock_decref_and_cancel(lockh, mode);
3534 ldlm_lock_decref(lockh, mode);
3539 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3540 __u32 mode, struct lustre_handle *lockh)
3543 RETURN(osc_cancel_base(lockh, mode));
3546 static int osc_cancel_unused(struct obd_export *exp,
3547 struct lov_stripe_md *lsm,
3548 ldlm_cancel_flags_t flags,
3551 struct obd_device *obd = class_exp2obd(exp);
3552 struct ldlm_res_id res_id, *resp = NULL;
3555 resp = osc_build_res_name(lsm->lsm_object_id,
3556 lsm->lsm_object_seq, &res_id);
3559 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3562 static int osc_statfs_interpret(const struct lu_env *env,
3563 struct ptlrpc_request *req,
3564 struct osc_async_args *aa, int rc)
3566 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3567 struct obd_statfs *msfs;
3572 /* The request has in fact never been sent
3573 * due to issues at a higher level (LOV).
3574 * Exit immediately since the caller is
3575 * aware of the problem and takes care
3576 * of the clean up */
3579 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3580 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3586 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3588 GOTO(out, rc = -EPROTO);
3591 /* Reinitialize the RDONLY and DEGRADED flags at the client
3592 * on each statfs, so they don't stay set permanently. */
3593 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3595 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3596 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3597 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3598 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3600 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3601 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3602 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3603 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3605 /* Add a bit of hysteresis so this flag isn't continually flapping,
3606 * and ensure that new files don't get extremely fragmented due to
3607 * only a small amount of available space in the filesystem.
3608 * We want to set the NOSPC flag when there is less than ~0.1% free
3609 * and clear it when there is at least ~0.2% free space, so:
3610 * avail < ~0.1% max max = avail + used
3611 * 1025 * avail < avail + used used = blocks - free
3612 * 1024 * avail < used
3613 * 1024 * avail < blocks - free
3614 * avail < ((blocks - free) >> 10)
3616 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3617 * lose that amount of space so in those cases we report no space left
3618 * if their is less than 1 GB left. */
3619 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3620 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3621 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3622 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3623 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3624 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3625 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3627 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3629 *aa->aa_oi->oi_osfs = *msfs;
3631 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3635 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3636 __u64 max_age, struct ptlrpc_request_set *rqset)
3638 struct ptlrpc_request *req;
3639 struct osc_async_args *aa;
3643 /* We could possibly pass max_age in the request (as an absolute
3644 * timestamp or a "seconds.usec ago") so the target can avoid doing
3645 * extra calls into the filesystem if that isn't necessary (e.g.
3646 * during mount that would help a bit). Having relative timestamps
3647 * is not so great if request processing is slow, while absolute
3648 * timestamps are not ideal because they need time synchronization. */
3649 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3653 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3655 ptlrpc_request_free(req);
3658 ptlrpc_request_set_replen(req);
3659 req->rq_request_portal = OST_CREATE_PORTAL;
3660 ptlrpc_at_set_req_timeout(req);
3662 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3663 /* procfs requests not want stat in wait for avoid deadlock */
3664 req->rq_no_resend = 1;
3665 req->rq_no_delay = 1;
3668 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3669 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3670 aa = ptlrpc_req_async_args(req);
3673 ptlrpc_set_add_req(rqset, req);
3677 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3678 __u64 max_age, __u32 flags)
3680 struct obd_statfs *msfs;
3681 struct ptlrpc_request *req;
3682 struct obd_import *imp = NULL;
3686 /*Since the request might also come from lprocfs, so we need
3687 *sync this with client_disconnect_export Bug15684*/
3688 cfs_down_read(&obd->u.cli.cl_sem);
3689 if (obd->u.cli.cl_import)
3690 imp = class_import_get(obd->u.cli.cl_import);
3691 cfs_up_read(&obd->u.cli.cl_sem);
3695 /* We could possibly pass max_age in the request (as an absolute
3696 * timestamp or a "seconds.usec ago") so the target can avoid doing
3697 * extra calls into the filesystem if that isn't necessary (e.g.
3698 * during mount that would help a bit). Having relative timestamps
3699 * is not so great if request processing is slow, while absolute
3700 * timestamps are not ideal because they need time synchronization. */
3701 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3703 class_import_put(imp);
3708 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3710 ptlrpc_request_free(req);
3713 ptlrpc_request_set_replen(req);
3714 req->rq_request_portal = OST_CREATE_PORTAL;
3715 ptlrpc_at_set_req_timeout(req);
3717 if (flags & OBD_STATFS_NODELAY) {
3718 /* procfs requests not want stat in wait for avoid deadlock */
3719 req->rq_no_resend = 1;
3720 req->rq_no_delay = 1;
3723 rc = ptlrpc_queue_wait(req);
3727 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3729 GOTO(out, rc = -EPROTO);
3736 ptlrpc_req_finished(req);
3740 /* Retrieve object striping information.
3742 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3743 * the maximum number of OST indices which will fit in the user buffer.
3744 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3746 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3748 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3749 struct lov_user_md_v3 lum, *lumk;
3750 struct lov_user_ost_data_v1 *lmm_objects;
3751 int rc = 0, lum_size;
3757 /* we only need the header part from user space to get lmm_magic and
3758 * lmm_stripe_count, (the header part is common to v1 and v3) */
3759 lum_size = sizeof(struct lov_user_md_v1);
3760 if (cfs_copy_from_user(&lum, lump, lum_size))
3763 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3764 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3767 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3768 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3769 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3770 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3772 /* we can use lov_mds_md_size() to compute lum_size
3773 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3774 if (lum.lmm_stripe_count > 0) {
3775 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3776 OBD_ALLOC(lumk, lum_size);
3780 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3781 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3783 lmm_objects = &(lumk->lmm_objects[0]);
3784 lmm_objects->l_object_id = lsm->lsm_object_id;
3786 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3790 lumk->lmm_object_id = lsm->lsm_object_id;
3791 lumk->lmm_object_seq = lsm->lsm_object_seq;
3792 lumk->lmm_stripe_count = 1;
3794 if (cfs_copy_to_user(lump, lumk, lum_size))
3798 OBD_FREE(lumk, lum_size);
3804 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3805 void *karg, void *uarg)
3807 struct obd_device *obd = exp->exp_obd;
3808 struct obd_ioctl_data *data = karg;
3812 if (!cfs_try_module_get(THIS_MODULE)) {
3813 CERROR("Can't get module. Is it alive?");
3817 case OBD_IOC_LOV_GET_CONFIG: {
3819 struct lov_desc *desc;
3820 struct obd_uuid uuid;
3824 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3825 GOTO(out, err = -EINVAL);
3827 data = (struct obd_ioctl_data *)buf;
3829 if (sizeof(*desc) > data->ioc_inllen1) {
3830 obd_ioctl_freedata(buf, len);
3831 GOTO(out, err = -EINVAL);
3834 if (data->ioc_inllen2 < sizeof(uuid)) {
3835 obd_ioctl_freedata(buf, len);
3836 GOTO(out, err = -EINVAL);
3839 desc = (struct lov_desc *)data->ioc_inlbuf1;
3840 desc->ld_tgt_count = 1;
3841 desc->ld_active_tgt_count = 1;
3842 desc->ld_default_stripe_count = 1;
3843 desc->ld_default_stripe_size = 0;
3844 desc->ld_default_stripe_offset = 0;
3845 desc->ld_pattern = 0;
3846 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3848 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3850 err = cfs_copy_to_user((void *)uarg, buf, len);
3853 obd_ioctl_freedata(buf, len);
3856 case LL_IOC_LOV_SETSTRIPE:
3857 err = obd_alloc_memmd(exp, karg);
3861 case LL_IOC_LOV_GETSTRIPE:
3862 err = osc_getstripe(karg, uarg);
3864 case OBD_IOC_CLIENT_RECOVER:
3865 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3870 case IOC_OSC_SET_ACTIVE:
3871 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3874 case OBD_IOC_POLL_QUOTACHECK:
3875 err = lquota_poll_check(quota_interface, exp,
3876 (struct if_quotacheck *)karg);
3878 case OBD_IOC_PING_TARGET:
3879 err = ptlrpc_obd_ping(obd);
3882 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3883 cmd, cfs_curproc_comm());
3884 GOTO(out, err = -ENOTTY);
3887 cfs_module_put(THIS_MODULE);
3891 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3892 void *key, __u32 *vallen, void *val,
3893 struct lov_stripe_md *lsm)
3896 if (!vallen || !val)
3899 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3900 __u32 *stripe = val;
3901 *vallen = sizeof(*stripe);
3904 } else if (KEY_IS(KEY_LAST_ID)) {
3905 struct ptlrpc_request *req;
3910 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3911 &RQF_OST_GET_INFO_LAST_ID);
3915 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3916 RCL_CLIENT, keylen);
3917 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3919 ptlrpc_request_free(req);
3923 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3924 memcpy(tmp, key, keylen);
3926 req->rq_no_delay = req->rq_no_resend = 1;
3927 ptlrpc_request_set_replen(req);
3928 rc = ptlrpc_queue_wait(req);
3932 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3934 GOTO(out, rc = -EPROTO);
3936 *((obd_id *)val) = *reply;
3938 ptlrpc_req_finished(req);
3940 } else if (KEY_IS(KEY_FIEMAP)) {
3941 struct ptlrpc_request *req;
3942 struct ll_user_fiemap *reply;
3946 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3947 &RQF_OST_GET_INFO_FIEMAP);
3951 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3952 RCL_CLIENT, keylen);
3953 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3954 RCL_CLIENT, *vallen);
3955 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3956 RCL_SERVER, *vallen);
3958 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3960 ptlrpc_request_free(req);
3964 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3965 memcpy(tmp, key, keylen);
3966 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3967 memcpy(tmp, val, *vallen);
3969 ptlrpc_request_set_replen(req);
3970 rc = ptlrpc_queue_wait(req);
3974 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3976 GOTO(out1, rc = -EPROTO);
3978 memcpy(val, reply, *vallen);
3980 ptlrpc_req_finished(req);
3988 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3990 struct llog_ctxt *ctxt;
3994 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3996 rc = llog_initiator_connect(ctxt);
3997 llog_ctxt_put(ctxt);
3999 /* XXX return an error? skip setting below flags? */
4002 cfs_spin_lock(&imp->imp_lock);
4003 imp->imp_server_timeout = 1;
4004 imp->imp_pingable = 1;
4005 cfs_spin_unlock(&imp->imp_lock);
4006 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4011 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4012 struct ptlrpc_request *req,
4019 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4022 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4023 void *key, obd_count vallen, void *val,
4024 struct ptlrpc_request_set *set)
4026 struct ptlrpc_request *req;
4027 struct obd_device *obd = exp->exp_obd;
4028 struct obd_import *imp = class_exp2cliimp(exp);
4033 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4035 if (KEY_IS(KEY_NEXT_ID)) {
4037 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4039 if (vallen != sizeof(obd_id))
4044 if (vallen != sizeof(obd_id))
4047 /* avoid race between allocate new object and set next id
4048 * from ll_sync thread */
4049 cfs_spin_lock(&oscc->oscc_lock);
4050 new_val = *((obd_id*)val) + 1;
4051 if (new_val > oscc->oscc_next_id)
4052 oscc->oscc_next_id = new_val;
4053 cfs_spin_unlock(&oscc->oscc_lock);
4054 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4055 exp->exp_obd->obd_name,
4056 obd->u.cli.cl_oscc.oscc_next_id);
4061 if (KEY_IS(KEY_CHECKSUM)) {
4062 if (vallen != sizeof(int))
4064 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4068 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4069 sptlrpc_conf_client_adapt(obd);
4073 if (KEY_IS(KEY_FLUSH_CTX)) {
4074 sptlrpc_import_flush_my_ctx(imp);
4078 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4081 /* We pass all other commands directly to OST. Since nobody calls osc
4082 methods directly and everybody is supposed to go through LOV, we
4083 assume lov checked invalid values for us.
4084 The only recognised values so far are evict_by_nid and mds_conn.
4085 Even if something bad goes through, we'd get a -EINVAL from OST
4088 if (KEY_IS(KEY_GRANT_SHRINK))
4089 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4091 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4096 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4097 RCL_CLIENT, keylen);
4098 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4099 RCL_CLIENT, vallen);
4100 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4102 ptlrpc_request_free(req);
4106 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4107 memcpy(tmp, key, keylen);
4108 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4109 memcpy(tmp, val, vallen);
4111 if (KEY_IS(KEY_MDS_CONN)) {
4112 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4114 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4115 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4116 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4117 req->rq_no_delay = req->rq_no_resend = 1;
4118 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4119 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4120 struct osc_grant_args *aa;
4123 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4124 aa = ptlrpc_req_async_args(req);
4127 ptlrpc_req_finished(req);
4130 *oa = ((struct ost_body *)val)->oa;
4132 req->rq_interpret_reply = osc_shrink_grant_interpret;
4135 ptlrpc_request_set_replen(req);
4136 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4137 LASSERT(set != NULL);
4138 ptlrpc_set_add_req(set, req);
4139 ptlrpc_check_set(NULL, set);
4141 ptlrpcd_add_req(req, PSCOPE_OTHER);
4147 static struct llog_operations osc_size_repl_logops = {
4148 lop_cancel: llog_obd_repl_cancel
4151 static struct llog_operations osc_mds_ost_orig_logops;
4153 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4154 struct obd_device *tgt, struct llog_catid *catid)
4159 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4160 &catid->lci_logid, &osc_mds_ost_orig_logops);
4162 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4166 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4167 NULL, &osc_size_repl_logops);
4169 struct llog_ctxt *ctxt =
4170 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4173 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4178 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4179 obd->obd_name, tgt->obd_name, catid, rc);
4180 CERROR("logid "LPX64":0x%x\n",
4181 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4186 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4187 struct obd_device *disk_obd, int *index)
4189 struct llog_catid catid;
4190 static char name[32] = CATLIST;
4194 LASSERT(olg == &obd->obd_olg);
4196 cfs_mutex_down(&olg->olg_cat_processing);
4197 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4199 CERROR("rc: %d\n", rc);
4203 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4204 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4205 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4207 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4209 CERROR("rc: %d\n", rc);
4213 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4215 CERROR("rc: %d\n", rc);
4220 cfs_mutex_up(&olg->olg_cat_processing);
4225 static int osc_llog_finish(struct obd_device *obd, int count)
4227 struct llog_ctxt *ctxt;
4228 int rc = 0, rc2 = 0;
4231 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4233 rc = llog_cleanup(ctxt);
4235 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4237 rc2 = llog_cleanup(ctxt);
4244 static int osc_reconnect(const struct lu_env *env,
4245 struct obd_export *exp, struct obd_device *obd,
4246 struct obd_uuid *cluuid,
4247 struct obd_connect_data *data,
4250 struct client_obd *cli = &obd->u.cli;
4252 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4255 client_obd_list_lock(&cli->cl_loi_list_lock);
4256 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4257 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4258 lost_grant = cli->cl_lost_grant;
4259 cli->cl_lost_grant = 0;
4260 client_obd_list_unlock(&cli->cl_loi_list_lock);
4262 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4263 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4264 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4265 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4266 " ocd_grant: %d\n", data->ocd_connect_flags,
4267 data->ocd_version, data->ocd_grant);
4273 static int osc_disconnect(struct obd_export *exp)
4275 struct obd_device *obd = class_exp2obd(exp);
4276 struct llog_ctxt *ctxt;
4279 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4281 if (obd->u.cli.cl_conn_count == 1) {
4282 /* Flush any remaining cancel messages out to the
4284 llog_sync(ctxt, exp);
4286 llog_ctxt_put(ctxt);
4288 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4292 rc = client_disconnect_export(exp);
4294 * Initially we put del_shrink_grant before disconnect_export, but it
4295 * causes the following problem if setup (connect) and cleanup
4296 * (disconnect) are tangled together.
4297 * connect p1 disconnect p2
4298 * ptlrpc_connect_import
4299 * ............... class_manual_cleanup
4302 * ptlrpc_connect_interrupt
4304 * add this client to shrink list
4306 * Bang! pinger trigger the shrink.
4307 * So the osc should be disconnected from the shrink list, after we
4308 * are sure the import has been destroyed. BUG18662
4310 if (obd->u.cli.cl_import == NULL)
4311 osc_del_shrink_grant(&obd->u.cli);
4315 static int osc_import_event(struct obd_device *obd,
4316 struct obd_import *imp,
4317 enum obd_import_event event)
4319 struct client_obd *cli;
4323 LASSERT(imp->imp_obd == obd);
4326 case IMP_EVENT_DISCON: {
4327 /* Only do this on the MDS OSC's */
4328 if (imp->imp_server_timeout) {
4329 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4331 cfs_spin_lock(&oscc->oscc_lock);
4332 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4333 cfs_spin_unlock(&oscc->oscc_lock);
4336 client_obd_list_lock(&cli->cl_loi_list_lock);
4337 cli->cl_avail_grant = 0;
4338 cli->cl_lost_grant = 0;
4339 client_obd_list_unlock(&cli->cl_loi_list_lock);
4342 case IMP_EVENT_INACTIVE: {
4343 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4346 case IMP_EVENT_INVALIDATE: {
4347 struct ldlm_namespace *ns = obd->obd_namespace;
4351 env = cl_env_get(&refcheck);
4355 client_obd_list_lock(&cli->cl_loi_list_lock);
4356 /* all pages go to failing rpcs due to the invalid
4358 osc_check_rpcs(env, cli);
4359 client_obd_list_unlock(&cli->cl_loi_list_lock);
4361 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4362 cl_env_put(env, &refcheck);
4367 case IMP_EVENT_ACTIVE: {
4368 /* Only do this on the MDS OSC's */
4369 if (imp->imp_server_timeout) {
4370 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4372 cfs_spin_lock(&oscc->oscc_lock);
4373 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4374 cfs_spin_unlock(&oscc->oscc_lock);
4376 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4379 case IMP_EVENT_OCD: {
4380 struct obd_connect_data *ocd = &imp->imp_connect_data;
4382 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4383 osc_init_grant(&obd->u.cli, ocd);
4386 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4387 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4389 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4392 case IMP_EVENT_DEACTIVATE: {
4393 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4396 case IMP_EVENT_ACTIVATE: {
4397 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4401 CERROR("Unknown import event %d\n", event);
4408 * Determine whether the lock can be canceled before replaying the lock
4409 * during recovery, see bug16774 for detailed information.
4411 * \retval zero the lock can't be canceled
4412 * \retval other ok to cancel
4414 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4416 check_res_locked(lock->l_resource);
4419 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4421 * XXX as a future improvement, we can also cancel unused write lock
4422 * if it doesn't have dirty data and active mmaps.
4424 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4425 (lock->l_granted_mode == LCK_PR ||
4426 lock->l_granted_mode == LCK_CR) &&
4427 (osc_dlm_lock_pageref(lock) == 0))
4433 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4439 rc = ptlrpcd_addref();
4443 rc = client_obd_setup(obd, lcfg);
4447 struct lprocfs_static_vars lvars = { 0 };
4448 struct client_obd *cli = &obd->u.cli;
4450 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4451 lprocfs_osc_init_vars(&lvars);
4452 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4453 lproc_osc_attach_seqstat(obd);
4454 sptlrpc_lprocfs_cliobd_attach(obd);
4455 ptlrpc_lprocfs_register_obd(obd);
4459 /* We need to allocate a few requests more, because
4460 brw_interpret tries to create new requests before freeing
4461 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4462 reserved, but I afraid that might be too much wasted RAM
4463 in fact, so 2 is just my guess and still should work. */
4464 cli->cl_import->imp_rq_pool =
4465 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4467 ptlrpc_add_rqs_to_pool);
4469 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4470 cfs_sema_init(&cli->cl_grant_sem, 1);
4472 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4478 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4484 case OBD_CLEANUP_EARLY: {
4485 struct obd_import *imp;
4486 imp = obd->u.cli.cl_import;
4487 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4488 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4489 ptlrpc_deactivate_import(imp);
4490 cfs_spin_lock(&imp->imp_lock);
4491 imp->imp_pingable = 0;
4492 cfs_spin_unlock(&imp->imp_lock);
4495 case OBD_CLEANUP_EXPORTS: {
4496 /* If we set up but never connected, the
4497 client import will not have been cleaned. */
4498 if (obd->u.cli.cl_import) {
4499 struct obd_import *imp;
4500 cfs_down_write(&obd->u.cli.cl_sem);
4501 imp = obd->u.cli.cl_import;
4502 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4504 ptlrpc_invalidate_import(imp);
4505 if (imp->imp_rq_pool) {
4506 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4507 imp->imp_rq_pool = NULL;
4509 class_destroy_import(imp);
4510 cfs_up_write(&obd->u.cli.cl_sem);
4511 obd->u.cli.cl_import = NULL;
4513 rc = obd_llog_finish(obd, 0);
4515 CERROR("failed to cleanup llogging subsystems\n");
4522 int osc_cleanup(struct obd_device *obd)
4527 ptlrpc_lprocfs_unregister_obd(obd);
4528 lprocfs_obd_cleanup(obd);
4530 /* free memory of osc quota cache */
4531 lquota_cleanup(quota_interface, obd);
4533 rc = client_obd_cleanup(obd);
4539 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4541 struct lprocfs_static_vars lvars = { 0 };
4544 lprocfs_osc_init_vars(&lvars);
4546 switch (lcfg->lcfg_command) {
4548 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4558 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4560 return osc_process_config_base(obd, buf);
4563 struct obd_ops osc_obd_ops = {
4564 .o_owner = THIS_MODULE,
4565 .o_setup = osc_setup,
4566 .o_precleanup = osc_precleanup,
4567 .o_cleanup = osc_cleanup,
4568 .o_add_conn = client_import_add_conn,
4569 .o_del_conn = client_import_del_conn,
4570 .o_connect = client_connect_import,
4571 .o_reconnect = osc_reconnect,
4572 .o_disconnect = osc_disconnect,
4573 .o_statfs = osc_statfs,
4574 .o_statfs_async = osc_statfs_async,
4575 .o_packmd = osc_packmd,
4576 .o_unpackmd = osc_unpackmd,
4577 .o_precreate = osc_precreate,
4578 .o_create = osc_create,
4579 .o_create_async = osc_create_async,
4580 .o_destroy = osc_destroy,
4581 .o_getattr = osc_getattr,
4582 .o_getattr_async = osc_getattr_async,
4583 .o_setattr = osc_setattr,
4584 .o_setattr_async = osc_setattr_async,
4586 .o_punch = osc_punch,
4588 .o_enqueue = osc_enqueue,
4589 .o_change_cbdata = osc_change_cbdata,
4590 .o_find_cbdata = osc_find_cbdata,
4591 .o_cancel = osc_cancel,
4592 .o_cancel_unused = osc_cancel_unused,
4593 .o_iocontrol = osc_iocontrol,
4594 .o_get_info = osc_get_info,
4595 .o_set_info_async = osc_set_info_async,
4596 .o_import_event = osc_import_event,
4597 .o_llog_init = osc_llog_init,
4598 .o_llog_finish = osc_llog_finish,
4599 .o_process_config = osc_process_config,
4602 extern struct lu_kmem_descr osc_caches[];
4603 extern cfs_spinlock_t osc_ast_guard;
4604 extern cfs_lock_class_key_t osc_ast_guard_class;
4606 int __init osc_init(void)
4608 struct lprocfs_static_vars lvars = { 0 };
4612 /* print an address of _any_ initialized kernel symbol from this
4613 * module, to allow debugging with gdb that doesn't support data
4614 * symbols from modules.*/
4615 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4617 rc = lu_kmem_init(osc_caches);
4619 lprocfs_osc_init_vars(&lvars);
4621 cfs_request_module("lquota");
4622 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4623 lquota_init(quota_interface);
4624 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4626 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4627 LUSTRE_OSC_NAME, &osc_device_type);
4629 if (quota_interface)
4630 PORTAL_SYMBOL_PUT(osc_quota_interface);
4631 lu_kmem_fini(osc_caches);
4635 cfs_spin_lock_init(&osc_ast_guard);
4636 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4638 osc_mds_ost_orig_logops = llog_lvfs_ops;
4639 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4640 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4641 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4642 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4648 static void /*__exit*/ osc_exit(void)
4650 lu_device_type_fini(&osc_device_type);
4652 lquota_exit(quota_interface);
4653 if (quota_interface)
4654 PORTAL_SYMBOL_PUT(osc_quota_interface);
4656 class_unregister_type(LUSTRE_OSC_NAME);
4657 lu_kmem_fini(osc_caches);
4660 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4661 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4662 MODULE_LICENSE("GPL");
4664 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);