4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 # include <liblustre.h>
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
63 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
64 static int brw_interpret(const struct lu_env *env,
65 struct ptlrpc_request *req, void *data, int rc);
66 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
68 int osc_cleanup(struct obd_device *obd);
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72 struct lov_stripe_md *lsm)
77 lmm_size = sizeof(**lmmp);
82 OBD_FREE(*lmmp, lmm_size);
88 OBD_ALLOC(*lmmp, lmm_size);
94 LASSERT(lsm->lsm_object_id);
95 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
96 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105 struct lov_mds_md *lmm, int lmm_bytes)
108 struct obd_import *imp = class_exp2cliimp(exp);
112 if (lmm_bytes < sizeof (*lmm)) {
113 CERROR("lov_mds_md too small: %d, need %d\n",
114 lmm_bytes, (int)sizeof(*lmm));
117 /* XXX LOV_MAGIC etc check? */
119 if (lmm->lmm_object_id == 0) {
120 CERROR("lov_mds_md: zero lmm_object_id\n");
125 lsm_size = lov_stripe_md_size(1);
129 if (*lsmp != NULL && lmm == NULL) {
130 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131 OBD_FREE(*lsmp, lsm_size);
137 OBD_ALLOC(*lsmp, lsm_size);
140 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141 if ((*lsmp)->lsm_oinfo[0] == NULL) {
142 OBD_FREE(*lsmp, lsm_size);
145 loi_init((*lsmp)->lsm_oinfo[0]);
149 /* XXX zero *lsmp? */
150 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
151 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
152 LASSERT((*lsmp)->lsm_object_id);
153 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
160 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166 struct ost_body *body, void *capa)
168 struct obd_capa *oc = (struct obd_capa *)capa;
169 struct lustre_capa *c;
174 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
177 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178 DEBUG_CAPA(D_SEC, c, "pack");
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182 struct obd_info *oinfo)
184 struct ost_body *body;
186 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
189 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
190 osc_pack_capa(req, body, oinfo->oi_capa);
193 static inline void osc_set_capa_size(struct ptlrpc_request *req,
194 const struct req_msg_field *field,
198 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
200 /* it is already calculated as sizeof struct obd_capa */
204 static int osc_getattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_async_args *aa, int rc)
208 struct ost_body *body;
214 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265 struct obd_info *oinfo)
267 struct ptlrpc_request *req;
268 struct ost_body *body;
272 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
276 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
279 ptlrpc_request_free(req);
283 osc_pack_req_body(req, oinfo);
285 ptlrpc_request_set_replen(req);
287 rc = ptlrpc_queue_wait(req);
291 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
293 GOTO(out, rc = -EPROTO);
295 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
298 /* This should really be sent by the OST */
299 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
300 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
304 ptlrpc_req_finished(req);
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309 struct obd_info *oinfo, struct obd_trans_info *oti)
311 struct ptlrpc_request *req;
312 struct ost_body *body;
316 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
325 ptlrpc_request_free(req);
329 osc_pack_req_body(req, oinfo);
331 ptlrpc_request_set_replen(req);
333 rc = ptlrpc_queue_wait(req);
337 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
339 GOTO(out, rc = -EPROTO);
341 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
345 ptlrpc_req_finished(req);
349 static int osc_setattr_interpret(const struct lu_env *env,
350 struct ptlrpc_request *req,
351 struct osc_setattr_args *sa, int rc)
353 struct ost_body *body;
359 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361 GOTO(out, rc = -EPROTO);
363 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
365 rc = sa->sa_upcall(sa->sa_cookie, rc);
369 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
370 struct obd_trans_info *oti,
371 obd_enqueue_update_f upcall, void *cookie,
372 struct ptlrpc_request_set *rqset)
374 struct ptlrpc_request *req;
375 struct osc_setattr_args *sa;
379 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
383 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
384 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386 ptlrpc_request_free(req);
390 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
391 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
393 osc_pack_req_body(req, oinfo);
395 ptlrpc_request_set_replen(req);
397 /* do mds to ost setattr asynchronously */
399 /* Do not wait for response. */
400 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
402 req->rq_interpret_reply =
403 (ptlrpc_interpterer_t)osc_setattr_interpret;
405 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
406 sa = ptlrpc_req_async_args(req);
407 sa->sa_oa = oinfo->oi_oa;
408 sa->sa_upcall = upcall;
409 sa->sa_cookie = cookie;
411 if (rqset == PTLRPCD_SET)
412 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
414 ptlrpc_set_add_req(rqset, req);
420 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
421 struct obd_trans_info *oti,
422 struct ptlrpc_request_set *rqset)
424 return osc_setattr_async_base(exp, oinfo, oti,
425 oinfo->oi_cb_up, oinfo, rqset);
428 int osc_real_create(struct obd_export *exp, struct obdo *oa,
429 struct lov_stripe_md **ea, struct obd_trans_info *oti)
431 struct ptlrpc_request *req;
432 struct ost_body *body;
433 struct lov_stripe_md *lsm;
442 rc = obd_alloc_memmd(exp, &lsm);
447 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
449 GOTO(out, rc = -ENOMEM);
451 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
453 ptlrpc_request_free(req);
457 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
459 lustre_set_wire_obdo(&body->oa, oa);
461 ptlrpc_request_set_replen(req);
463 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464 oa->o_flags == OBD_FL_DELORPHAN) {
466 "delorphan from OST integration");
467 /* Don't resend the delorphan req */
468 req->rq_no_resend = req->rq_no_delay = 1;
471 rc = ptlrpc_queue_wait(req);
475 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
477 GOTO(out_req, rc = -EPROTO);
479 lustre_get_wire_obdo(oa, &body->oa);
481 /* This should really be sent by the OST */
482 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
483 oa->o_valid |= OBD_MD_FLBLKSZ;
485 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486 * have valid lsm_oinfo data structs, so don't go touching that.
487 * This needs to be fixed in a big way.
489 lsm->lsm_object_id = oa->o_id;
490 lsm->lsm_object_seq = oa->o_seq;
494 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
496 if (oa->o_valid & OBD_MD_FLCOOKIE) {
497 if (!oti->oti_logcookies)
498 oti_alloc_cookies(oti, 1);
499 *oti->oti_logcookies = oa->o_lcookie;
503 CDEBUG(D_HA, "transno: "LPD64"\n",
504 lustre_msg_get_transno(req->rq_repmsg));
506 ptlrpc_req_finished(req);
509 obd_free_memmd(exp, &lsm);
513 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
514 obd_enqueue_update_f upcall, void *cookie,
515 struct ptlrpc_request_set *rqset)
517 struct ptlrpc_request *req;
518 struct osc_setattr_args *sa;
519 struct ost_body *body;
523 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
528 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
530 ptlrpc_request_free(req);
533 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
534 ptlrpc_at_set_req_timeout(req);
536 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
538 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
539 osc_pack_capa(req, body, oinfo->oi_capa);
541 ptlrpc_request_set_replen(req);
543 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545 sa = ptlrpc_req_async_args(req);
546 sa->sa_oa = oinfo->oi_oa;
547 sa->sa_upcall = upcall;
548 sa->sa_cookie = cookie;
549 if (rqset == PTLRPCD_SET)
550 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
552 ptlrpc_set_add_req(rqset, req);
557 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558 struct obd_info *oinfo, struct obd_trans_info *oti,
559 struct ptlrpc_request_set *rqset)
561 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
562 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564 return osc_punch_base(exp, oinfo,
565 oinfo->oi_cb_up, oinfo, rqset);
568 static int osc_sync_interpret(const struct lu_env *env,
569 struct ptlrpc_request *req,
572 struct osc_async_args *aa = arg;
573 struct ost_body *body;
579 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
581 CERROR ("can't unpack ost_body\n");
582 GOTO(out, rc = -EPROTO);
585 *aa->aa_oi->oi_oa = body->oa;
587 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
591 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
592 struct obd_info *oinfo, obd_size start, obd_size end,
593 struct ptlrpc_request_set *set)
595 struct ptlrpc_request *req;
596 struct ost_body *body;
597 struct osc_async_args *aa;
602 CDEBUG(D_INFO, "oa NULL\n");
606 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
610 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
611 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
613 ptlrpc_request_free(req);
617 /* overload the size and blocks fields in the oa with start/end */
618 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
620 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
621 body->oa.o_size = start;
622 body->oa.o_blocks = end;
623 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
624 osc_pack_capa(req, body, oinfo->oi_capa);
626 ptlrpc_request_set_replen(req);
627 req->rq_interpret_reply = osc_sync_interpret;
629 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
630 aa = ptlrpc_req_async_args(req);
633 ptlrpc_set_add_req(set, req);
637 /* Find and cancel locally locks matched by @mode in the resource found by
638 * @objid. Found locks are added into @cancel list. Returns the amount of
639 * locks added to @cancels list. */
640 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
642 ldlm_mode_t mode, int lock_flags)
644 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
645 struct ldlm_res_id res_id;
646 struct ldlm_resource *res;
650 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
651 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
655 LDLM_RESOURCE_ADDREF(res);
656 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
657 lock_flags, 0, NULL);
658 LDLM_RESOURCE_DELREF(res);
659 ldlm_resource_putref(res);
663 static int osc_destroy_interpret(const struct lu_env *env,
664 struct ptlrpc_request *req, void *data,
667 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
669 cfs_atomic_dec(&cli->cl_destroy_in_flight);
670 cfs_waitq_signal(&cli->cl_destroy_waitq);
674 static int osc_can_send_destroy(struct client_obd *cli)
676 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
677 cli->cl_max_rpcs_in_flight) {
678 /* The destroy request can be sent */
681 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
682 cli->cl_max_rpcs_in_flight) {
684 * The counter has been modified between the two atomic
687 cfs_waitq_signal(&cli->cl_destroy_waitq);
692 /* Destroy requests can be async always on the client, and we don't even really
693 * care about the return code since the client cannot do anything at all about
695 * When the MDS is unlinking a filename, it saves the file objects into a
696 * recovery llog, and these object records are cancelled when the OST reports
697 * they were destroyed and sync'd to disk (i.e. transaction committed).
698 * If the client dies, or the OST is down when the object should be destroyed,
699 * the records are not cancelled, and when the OST reconnects to the MDS next,
700 * it will retrieve the llog unlink logs and then sends the log cancellation
701 * cookies to the MDS after committing destroy transactions. */
702 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
703 struct obdo *oa, struct lov_stripe_md *ea,
704 struct obd_trans_info *oti, struct obd_export *md_export,
707 struct client_obd *cli = &exp->exp_obd->u.cli;
708 struct ptlrpc_request *req;
709 struct ost_body *body;
710 CFS_LIST_HEAD(cancels);
715 CDEBUG(D_INFO, "oa NULL\n");
719 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
720 LDLM_FL_DISCARD_DATA);
722 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
724 ldlm_lock_list_put(&cancels, l_bl_ast, count);
728 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
729 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
732 ptlrpc_request_free(req);
736 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
737 ptlrpc_at_set_req_timeout(req);
739 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
740 oa->o_lcookie = *oti->oti_logcookies;
741 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
743 lustre_set_wire_obdo(&body->oa, oa);
745 osc_pack_capa(req, body, (struct obd_capa *)capa);
746 ptlrpc_request_set_replen(req);
748 /* don't throttle destroy RPCs for the MDT */
749 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
750 req->rq_interpret_reply = osc_destroy_interpret;
751 if (!osc_can_send_destroy(cli)) {
752 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
756 * Wait until the number of on-going destroy RPCs drops
757 * under max_rpc_in_flight
759 l_wait_event_exclusive(cli->cl_destroy_waitq,
760 osc_can_send_destroy(cli), &lwi);
764 /* Do not wait for response */
765 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
769 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
772 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
774 LASSERT(!(oa->o_valid & bits));
777 client_obd_list_lock(&cli->cl_loi_list_lock);
778 oa->o_dirty = cli->cl_dirty;
779 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
780 CERROR("dirty %lu - %lu > dirty_max %lu\n",
781 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
783 } else if (cfs_atomic_read(&obd_dirty_pages) -
784 cfs_atomic_read(&obd_dirty_transit_pages) >
785 obd_max_dirty_pages + 1){
786 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
787 * not covered by a lock thus they may safely race and trip
788 * this CERROR() unless we add in a small fudge factor (+1). */
789 CERROR("dirty %d - %d > system dirty_max %d\n",
790 cfs_atomic_read(&obd_dirty_pages),
791 cfs_atomic_read(&obd_dirty_transit_pages),
792 obd_max_dirty_pages);
794 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
795 CERROR("dirty %lu - dirty_max %lu too big???\n",
796 cli->cl_dirty, cli->cl_dirty_max);
799 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
800 (cli->cl_max_rpcs_in_flight + 1);
801 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
803 oa->o_grant = cli->cl_avail_grant;
804 oa->o_dropped = cli->cl_lost_grant;
805 cli->cl_lost_grant = 0;
806 client_obd_list_unlock(&cli->cl_loi_list_lock);
807 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
808 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
812 static void osc_update_next_shrink(struct client_obd *cli)
814 cli->cl_next_shrink_grant =
815 cfs_time_shift(cli->cl_grant_shrink_interval);
816 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
817 cli->cl_next_shrink_grant);
820 /* caller must hold loi_list_lock */
821 static void osc_consume_write_grant(struct client_obd *cli,
822 struct brw_page *pga)
824 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
825 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
826 cfs_atomic_inc(&obd_dirty_pages);
827 cli->cl_dirty += CFS_PAGE_SIZE;
828 cli->cl_avail_grant -= CFS_PAGE_SIZE;
829 pga->flag |= OBD_BRW_FROM_GRANT;
830 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
831 CFS_PAGE_SIZE, pga, pga->pg);
832 LASSERT(cli->cl_avail_grant >= 0);
833 osc_update_next_shrink(cli);
836 /* the companion to osc_consume_write_grant, called when a brw has completed.
837 * must be called with the loi lock held. */
838 static void osc_release_write_grant(struct client_obd *cli,
839 struct brw_page *pga, int sent)
841 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
844 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
845 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
850 pga->flag &= ~OBD_BRW_FROM_GRANT;
851 cfs_atomic_dec(&obd_dirty_pages);
852 cli->cl_dirty -= CFS_PAGE_SIZE;
853 if (pga->flag & OBD_BRW_NOCACHE) {
854 pga->flag &= ~OBD_BRW_NOCACHE;
855 cfs_atomic_dec(&obd_dirty_transit_pages);
856 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
859 /* Reclaim grant from truncated pages. This is used to solve
860 * write-truncate and grant all gone(to lost_grant) problem.
861 * For a vfs write this problem can be easily solved by a sync
862 * write, however, this is not an option for page_mkwrite()
863 * because grant has to be allocated before a page becomes
865 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
866 cli->cl_avail_grant += CFS_PAGE_SIZE;
868 cli->cl_lost_grant += CFS_PAGE_SIZE;
869 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
870 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
871 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
872 /* For short writes we shouldn't count parts of pages that
873 * span a whole block on the OST side, or our accounting goes
874 * wrong. Should match the code in filter_grant_check. */
875 int offset = pga->off & ~CFS_PAGE_MASK;
876 int count = pga->count + (offset & (blocksize - 1));
877 int end = (offset + pga->count) & (blocksize - 1);
879 count += blocksize - end;
881 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
882 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
883 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
884 cli->cl_avail_grant, cli->cl_dirty);
890 static unsigned long rpcs_in_flight(struct client_obd *cli)
892 return cli->cl_r_in_flight + cli->cl_w_in_flight;
895 /* caller must hold loi_list_lock */
896 void osc_wake_cache_waiters(struct client_obd *cli)
899 struct osc_cache_waiter *ocw;
902 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
903 /* if we can't dirty more, we must wait until some is written */
904 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
905 (cfs_atomic_read(&obd_dirty_pages) + 1 >
906 obd_max_dirty_pages)) {
907 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
908 "osc max %ld, sys max %d\n", cli->cl_dirty,
909 cli->cl_dirty_max, obd_max_dirty_pages);
913 /* if still dirty cache but no grant wait for pending RPCs that
914 * may yet return us some grant before doing sync writes */
915 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
916 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
917 cli->cl_w_in_flight);
921 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
922 cfs_list_del_init(&ocw->ocw_entry);
923 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
924 /* no more RPCs in flight to return grant, do sync IO */
925 ocw->ocw_rc = -EDQUOT;
926 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
928 osc_consume_write_grant(cli,
929 &ocw->ocw_oap->oap_brw_page);
932 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
933 ocw, ocw->ocw_oap, cli->cl_avail_grant);
935 cfs_waitq_signal(&ocw->ocw_waitq);
941 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
943 client_obd_list_lock(&cli->cl_loi_list_lock);
944 cli->cl_avail_grant += grant;
945 client_obd_list_unlock(&cli->cl_loi_list_lock);
948 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
950 if (body->oa.o_valid & OBD_MD_FLGRANT) {
951 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
952 __osc_update_grant(cli, body->oa.o_grant);
956 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
957 obd_count keylen, void *key, obd_count vallen,
958 void *val, struct ptlrpc_request_set *set);
960 static int osc_shrink_grant_interpret(const struct lu_env *env,
961 struct ptlrpc_request *req,
964 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
965 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
966 struct ost_body *body;
969 __osc_update_grant(cli, oa->o_grant);
973 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
975 osc_update_grant(cli, body);
981 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
983 client_obd_list_lock(&cli->cl_loi_list_lock);
984 oa->o_grant = cli->cl_avail_grant / 4;
985 cli->cl_avail_grant -= oa->o_grant;
986 client_obd_list_unlock(&cli->cl_loi_list_lock);
987 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
988 oa->o_valid |= OBD_MD_FLFLAGS;
991 oa->o_flags |= OBD_FL_SHRINK_GRANT;
992 osc_update_next_shrink(cli);
995 /* Shrink the current grant, either from some large amount to enough for a
996 * full set of in-flight RPCs, or if we have already shrunk to that limit
997 * then to enough for a single RPC. This avoids keeping more grant than
998 * needed, and avoids shrinking the grant piecemeal. */
999 static int osc_shrink_grant(struct client_obd *cli)
1001 long target = (cli->cl_max_rpcs_in_flight + 1) *
1002 cli->cl_max_pages_per_rpc;
1004 client_obd_list_lock(&cli->cl_loi_list_lock);
1005 if (cli->cl_avail_grant <= target)
1006 target = cli->cl_max_pages_per_rpc;
1007 client_obd_list_unlock(&cli->cl_loi_list_lock);
1009 return osc_shrink_grant_to_target(cli, target);
1012 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1015 struct ost_body *body;
1018 client_obd_list_lock(&cli->cl_loi_list_lock);
1019 /* Don't shrink if we are already above or below the desired limit
1020 * We don't want to shrink below a single RPC, as that will negatively
1021 * impact block allocation and long-term performance. */
1022 if (target < cli->cl_max_pages_per_rpc)
1023 target = cli->cl_max_pages_per_rpc;
1025 if (target >= cli->cl_avail_grant) {
1026 client_obd_list_unlock(&cli->cl_loi_list_lock);
1029 client_obd_list_unlock(&cli->cl_loi_list_lock);
1031 OBD_ALLOC_PTR(body);
1035 osc_announce_cached(cli, &body->oa, 0);
1037 client_obd_list_lock(&cli->cl_loi_list_lock);
1038 body->oa.o_grant = cli->cl_avail_grant - target;
1039 cli->cl_avail_grant = target;
1040 client_obd_list_unlock(&cli->cl_loi_list_lock);
1041 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1042 body->oa.o_valid |= OBD_MD_FLFLAGS;
1043 body->oa.o_flags = 0;
1045 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1046 osc_update_next_shrink(cli);
1048 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
1049 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1050 sizeof(*body), body, NULL);
1052 __osc_update_grant(cli, body->oa.o_grant);
1057 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1058 static int osc_should_shrink_grant(struct client_obd *client)
1060 cfs_time_t time = cfs_time_current();
1061 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1063 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1064 OBD_CONNECT_GRANT_SHRINK) == 0)
1067 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1068 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1069 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1072 osc_update_next_shrink(client);
1077 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1079 struct client_obd *client;
1081 cfs_list_for_each_entry(client, &item->ti_obd_list,
1082 cl_grant_shrink_list) {
1083 if (osc_should_shrink_grant(client))
1084 osc_shrink_grant(client);
1089 static int osc_add_shrink_grant(struct client_obd *client)
1093 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1095 osc_grant_shrink_grant_cb, NULL,
1096 &client->cl_grant_shrink_list);
1098 CERROR("add grant client %s error %d\n",
1099 client->cl_import->imp_obd->obd_name, rc);
1102 CDEBUG(D_CACHE, "add grant client %s \n",
1103 client->cl_import->imp_obd->obd_name);
1104 osc_update_next_shrink(client);
1108 static int osc_del_shrink_grant(struct client_obd *client)
1110 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1114 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1117 * ocd_grant is the total grant amount we're expect to hold: if we've
1118 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1119 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1121 * race is tolerable here: if we're evicted, but imp_state already
1122 * left EVICTED state, then cl_dirty must be 0 already.
1124 client_obd_list_lock(&cli->cl_loi_list_lock);
1125 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1126 cli->cl_avail_grant = ocd->ocd_grant;
1128 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1130 if (cli->cl_avail_grant < 0) {
1131 CWARN("%s: available grant < 0, the OSS is probably not running"
1132 " with patch from bug20278 (%ld) \n",
1133 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1134 /* workaround for 1.6 servers which do not have
1135 * the patch from bug20278 */
1136 cli->cl_avail_grant = ocd->ocd_grant;
1139 client_obd_list_unlock(&cli->cl_loi_list_lock);
1141 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1142 cli->cl_import->imp_obd->obd_name,
1143 cli->cl_avail_grant, cli->cl_lost_grant);
1145 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1146 cfs_list_empty(&cli->cl_grant_shrink_list))
1147 osc_add_shrink_grant(cli);
1150 /* We assume that the reason this OSC got a short read is because it read
1151 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1152 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1153 * this stripe never got written at or beyond this stripe offset yet. */
1154 static void handle_short_read(int nob_read, obd_count page_count,
1155 struct brw_page **pga)
1160 /* skip bytes read OK */
1161 while (nob_read > 0) {
1162 LASSERT (page_count > 0);
1164 if (pga[i]->count > nob_read) {
1165 /* EOF inside this page */
1166 ptr = cfs_kmap(pga[i]->pg) +
1167 (pga[i]->off & ~CFS_PAGE_MASK);
1168 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1169 cfs_kunmap(pga[i]->pg);
1175 nob_read -= pga[i]->count;
1180 /* zero remaining pages */
1181 while (page_count-- > 0) {
1182 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1183 memset(ptr, 0, pga[i]->count);
1184 cfs_kunmap(pga[i]->pg);
1189 static int check_write_rcs(struct ptlrpc_request *req,
1190 int requested_nob, int niocount,
1191 obd_count page_count, struct brw_page **pga)
1196 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1197 sizeof(*remote_rcs) *
1199 if (remote_rcs == NULL) {
1200 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1204 /* return error if any niobuf was in error */
1205 for (i = 0; i < niocount; i++) {
1206 if ((int)remote_rcs[i] < 0)
1207 return(remote_rcs[i]);
1209 if (remote_rcs[i] != 0) {
1210 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1211 i, remote_rcs[i], req);
1216 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1217 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1218 req->rq_bulk->bd_nob_transferred, requested_nob);
1225 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1227 if (p1->flag != p2->flag) {
1228 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1229 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1231 /* warn if we try to combine flags that we don't know to be
1232 * safe to combine */
1233 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1234 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1235 "report this at http://bugs.whamcloud.com/\n",
1236 p1->flag, p2->flag);
1241 return (p1->off + p1->count == p2->off);
1244 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1245 struct brw_page **pga, int opc,
1246 cksum_type_t cksum_type)
1251 LASSERT (pg_count > 0);
1252 cksum = init_checksum(cksum_type);
1253 while (nob > 0 && pg_count > 0) {
1254 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1255 int off = pga[i]->off & ~CFS_PAGE_MASK;
1256 int count = pga[i]->count > nob ? nob : pga[i]->count;
1258 /* corrupt the data before we compute the checksum, to
1259 * simulate an OST->client data error */
1260 if (i == 0 && opc == OST_READ &&
1261 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1262 memcpy(ptr + off, "bad1", min(4, nob));
1263 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1264 cfs_kunmap(pga[i]->pg);
1265 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1268 nob -= pga[i]->count;
1272 /* For sending we only compute the wrong checksum instead
1273 * of corrupting the data so it is still correct on a redo */
1274 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1277 return fini_checksum(cksum, cksum_type);
1280 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1281 struct lov_stripe_md *lsm, obd_count page_count,
1282 struct brw_page **pga,
1283 struct ptlrpc_request **reqp,
1284 struct obd_capa *ocapa, int reserve,
1287 struct ptlrpc_request *req;
1288 struct ptlrpc_bulk_desc *desc;
1289 struct ost_body *body;
1290 struct obd_ioobj *ioobj;
1291 struct niobuf_remote *niobuf;
1292 int niocount, i, requested_nob, opc, rc;
1293 struct osc_brw_async_args *aa;
1294 struct req_capsule *pill;
1295 struct brw_page *pg_prev;
1298 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1299 RETURN(-ENOMEM); /* Recoverable */
1300 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1301 RETURN(-EINVAL); /* Fatal */
1303 if ((cmd & OBD_BRW_WRITE) != 0) {
1305 req = ptlrpc_request_alloc_pool(cli->cl_import,
1306 cli->cl_import->imp_rq_pool,
1307 &RQF_OST_BRW_WRITE);
1310 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1315 for (niocount = i = 1; i < page_count; i++) {
1316 if (!can_merge_pages(pga[i - 1], pga[i]))
1320 pill = &req->rq_pill;
1321 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1323 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1324 niocount * sizeof(*niobuf));
1325 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1327 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1329 ptlrpc_request_free(req);
1332 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1333 ptlrpc_at_set_req_timeout(req);
1335 if (opc == OST_WRITE)
1336 desc = ptlrpc_prep_bulk_imp(req, page_count,
1337 BULK_GET_SOURCE, OST_BULK_PORTAL);
1339 desc = ptlrpc_prep_bulk_imp(req, page_count,
1340 BULK_PUT_SINK, OST_BULK_PORTAL);
1343 GOTO(out, rc = -ENOMEM);
1344 /* NB request now owns desc and will free it when it gets freed */
1346 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1347 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1348 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1349 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1351 lustre_set_wire_obdo(&body->oa, oa);
1353 obdo_to_ioobj(oa, ioobj);
1354 ioobj->ioo_bufcnt = niocount;
1355 osc_pack_capa(req, body, ocapa);
1356 LASSERT (page_count > 0);
1358 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1359 struct brw_page *pg = pga[i];
1360 int poff = pg->off & ~CFS_PAGE_MASK;
1362 LASSERT(pg->count > 0);
1363 /* make sure there is no gap in the middle of page array */
1364 LASSERTF(page_count == 1 ||
1365 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1366 ergo(i > 0 && i < page_count - 1,
1367 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1368 ergo(i == page_count - 1, poff == 0)),
1369 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1370 i, page_count, pg, pg->off, pg->count);
1372 LASSERTF(i == 0 || pg->off > pg_prev->off,
1373 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1374 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1376 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1377 pg_prev->pg, page_private(pg_prev->pg),
1378 pg_prev->pg->index, pg_prev->off);
1380 LASSERTF(i == 0 || pg->off > pg_prev->off,
1381 "i %d p_c %u\n", i, page_count);
1383 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1384 (pg->flag & OBD_BRW_SRVLOCK));
1386 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1387 requested_nob += pg->count;
1389 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1391 niobuf->len += pg->count;
1393 niobuf->offset = pg->off;
1394 niobuf->len = pg->count;
1395 niobuf->flags = pg->flag;
1400 LASSERTF((void *)(niobuf - niocount) ==
1401 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1402 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1403 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1405 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1407 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1408 body->oa.o_valid |= OBD_MD_FLFLAGS;
1409 body->oa.o_flags = 0;
1411 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1414 if (osc_should_shrink_grant(cli))
1415 osc_shrink_grant_local(cli, &body->oa);
1417 /* size[REQ_REC_OFF] still sizeof (*body) */
1418 if (opc == OST_WRITE) {
1419 if (cli->cl_checksum &&
1420 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1421 /* store cl_cksum_type in a local variable since
1422 * it can be changed via lprocfs */
1423 cksum_type_t cksum_type = cli->cl_cksum_type;
1425 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1426 oa->o_flags &= OBD_FL_LOCAL_MASK;
1427 body->oa.o_flags = 0;
1429 body->oa.o_flags |= cksum_type_pack(cksum_type);
1430 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1435 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1437 /* save this in 'oa', too, for later checking */
1438 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1439 oa->o_flags |= cksum_type_pack(cksum_type);
1441 /* clear out the checksum flag, in case this is a
1442 * resend but cl_checksum is no longer set. b=11238 */
1443 oa->o_valid &= ~OBD_MD_FLCKSUM;
1445 oa->o_cksum = body->oa.o_cksum;
1446 /* 1 RC per niobuf */
1447 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1448 sizeof(__u32) * niocount);
1450 if (cli->cl_checksum &&
1451 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1452 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1453 body->oa.o_flags = 0;
1454 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1455 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1458 ptlrpc_request_set_replen(req);
1460 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1461 aa = ptlrpc_req_async_args(req);
1463 aa->aa_requested_nob = requested_nob;
1464 aa->aa_nio_count = niocount;
1465 aa->aa_page_count = page_count;
1469 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1470 if (ocapa && reserve)
1471 aa->aa_ocapa = capa_get(ocapa);
1477 ptlrpc_req_finished(req);
1481 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1482 __u32 client_cksum, __u32 server_cksum, int nob,
1483 obd_count page_count, struct brw_page **pga,
1484 cksum_type_t client_cksum_type)
1488 cksum_type_t cksum_type;
1490 if (server_cksum == client_cksum) {
1491 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1495 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1497 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1500 if (cksum_type != client_cksum_type)
1501 msg = "the server did not use the checksum type specified in "
1502 "the original request - likely a protocol problem";
1503 else if (new_cksum == server_cksum)
1504 msg = "changed on the client after we checksummed it - "
1505 "likely false positive due to mmap IO (bug 11742)";
1506 else if (new_cksum == client_cksum)
1507 msg = "changed in transit before arrival at OST";
1509 msg = "changed in transit AND doesn't match the original - "
1510 "likely false positive due to mmap IO (bug 11742)";
1512 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1513 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1514 msg, libcfs_nid2str(peer->nid),
1515 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1516 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1517 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1519 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1521 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1522 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1523 "client csum now %x\n", client_cksum, client_cksum_type,
1524 server_cksum, cksum_type, new_cksum);
1528 /* Note rc enters this function as number of bytes transferred */
1529 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1531 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1532 const lnet_process_id_t *peer =
1533 &req->rq_import->imp_connection->c_peer;
1534 struct client_obd *cli = aa->aa_cli;
1535 struct ost_body *body;
1536 __u32 client_cksum = 0;
1539 if (rc < 0 && rc != -EDQUOT) {
1540 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1544 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1545 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1547 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1551 /* set/clear over quota flag for a uid/gid */
1552 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1553 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1554 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1556 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1557 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1559 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1562 osc_update_grant(cli, body);
1567 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1568 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1570 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1572 CERROR("Unexpected +ve rc %d\n", rc);
1575 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1577 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1580 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1581 check_write_checksum(&body->oa, peer, client_cksum,
1582 body->oa.o_cksum, aa->aa_requested_nob,
1583 aa->aa_page_count, aa->aa_ppga,
1584 cksum_type_unpack(aa->aa_oa->o_flags)))
1587 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1588 aa->aa_page_count, aa->aa_ppga);
1592 /* The rest of this function executes only for OST_READs */
1594 /* if unwrap_bulk failed, return -EAGAIN to retry */
1595 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1597 GOTO(out, rc = -EAGAIN);
1599 if (rc > aa->aa_requested_nob) {
1600 CERROR("Unexpected rc %d (%d requested)\n", rc,
1601 aa->aa_requested_nob);
1605 if (rc != req->rq_bulk->bd_nob_transferred) {
1606 CERROR ("Unexpected rc %d (%d transferred)\n",
1607 rc, req->rq_bulk->bd_nob_transferred);
1611 if (rc < aa->aa_requested_nob)
1612 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1614 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1615 static int cksum_counter;
1616 __u32 server_cksum = body->oa.o_cksum;
1619 cksum_type_t cksum_type;
1621 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1622 body->oa.o_flags : 0);
1623 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1624 aa->aa_ppga, OST_READ,
1627 if (peer->nid == req->rq_bulk->bd_sender) {
1631 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1634 if (server_cksum == ~0 && rc > 0) {
1635 CERROR("Protocol error: server %s set the 'checksum' "
1636 "bit, but didn't send a checksum. Not fatal, "
1637 "but please notify on http://bugs.whamcloud.com/\n",
1638 libcfs_nid2str(peer->nid));
1639 } else if (server_cksum != client_cksum) {
1640 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1641 "%s%s%s inode "DFID" object "
1642 LPU64"/"LPU64" extent "
1643 "["LPU64"-"LPU64"]\n",
1644 req->rq_import->imp_obd->obd_name,
1645 libcfs_nid2str(peer->nid),
1647 body->oa.o_valid & OBD_MD_FLFID ?
1648 body->oa.o_parent_seq : (__u64)0,
1649 body->oa.o_valid & OBD_MD_FLFID ?
1650 body->oa.o_parent_oid : 0,
1651 body->oa.o_valid & OBD_MD_FLFID ?
1652 body->oa.o_parent_ver : 0,
1654 body->oa.o_valid & OBD_MD_FLGROUP ?
1655 body->oa.o_seq : (__u64)0,
1656 aa->aa_ppga[0]->off,
1657 aa->aa_ppga[aa->aa_page_count-1]->off +
1658 aa->aa_ppga[aa->aa_page_count-1]->count -
1660 CERROR("client %x, server %x, cksum_type %x\n",
1661 client_cksum, server_cksum, cksum_type);
1663 aa->aa_oa->o_cksum = client_cksum;
1667 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1670 } else if (unlikely(client_cksum)) {
1671 static int cksum_missed;
1674 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1675 CERROR("Checksum %u requested from %s but not sent\n",
1676 cksum_missed, libcfs_nid2str(peer->nid));
1682 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1687 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1688 struct lov_stripe_md *lsm,
1689 obd_count page_count, struct brw_page **pga,
1690 struct obd_capa *ocapa)
1692 struct ptlrpc_request *req;
1695 int generation, resends = 0;
1696 struct l_wait_info lwi;
1700 cfs_waitq_init(&waitq);
1701 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1704 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1705 page_count, pga, &req, ocapa, 0, resends);
1710 req->rq_generation_set = 1;
1711 req->rq_import_generation = generation;
1712 req->rq_sent = cfs_time_current_sec() + resends;
1715 rc = ptlrpc_queue_wait(req);
1717 if (rc == -ETIMEDOUT && req->rq_resend) {
1718 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1719 ptlrpc_req_finished(req);
1723 rc = osc_brw_fini_request(req, rc);
1725 ptlrpc_req_finished(req);
1726 /* When server return -EINPROGRESS, client should always retry
1727 * regardless of the number of times the bulk was resent already.*/
1728 if (osc_recoverable_error(rc)) {
1730 if (rc != -EINPROGRESS &&
1731 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1732 CERROR("%s: too many resend retries for object: "
1733 ""LPU64":"LPU64", rc = %d.\n",
1734 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1738 exp->exp_obd->u.cli.cl_import->imp_generation) {
1739 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1740 ""LPU64":"LPU64", rc = %d.\n",
1741 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1745 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1747 l_wait_event(waitq, 0, &lwi);
1752 if (rc == -EAGAIN || rc == -EINPROGRESS)
1757 int osc_brw_redo_request(struct ptlrpc_request *request,
1758 struct osc_brw_async_args *aa)
1760 struct ptlrpc_request *new_req;
1761 struct ptlrpc_request_set *set = request->rq_set;
1762 struct osc_brw_async_args *new_aa;
1763 struct osc_async_page *oap;
1767 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1769 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1770 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1771 aa->aa_cli, aa->aa_oa,
1772 NULL /* lsm unused by osc currently */,
1773 aa->aa_page_count, aa->aa_ppga,
1774 &new_req, aa->aa_ocapa, 0, 1);
1778 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1780 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1781 if (oap->oap_request != NULL) {
1782 LASSERTF(request == oap->oap_request,
1783 "request %p != oap_request %p\n",
1784 request, oap->oap_request);
1785 if (oap->oap_interrupted) {
1786 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1787 ptlrpc_req_finished(new_req);
1792 /* New request takes over pga and oaps from old request.
1793 * Note that copying a list_head doesn't work, need to move it... */
1795 new_req->rq_interpret_reply = request->rq_interpret_reply;
1796 new_req->rq_async_args = request->rq_async_args;
1797 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1798 new_req->rq_generation_set = 1;
1799 new_req->rq_import_generation = request->rq_import_generation;
1801 new_aa = ptlrpc_req_async_args(new_req);
1803 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1804 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1805 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1807 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1808 if (oap->oap_request) {
1809 ptlrpc_req_finished(oap->oap_request);
1810 oap->oap_request = ptlrpc_request_addref(new_req);
1814 new_aa->aa_ocapa = aa->aa_ocapa;
1815 aa->aa_ocapa = NULL;
1817 /* use ptlrpc_set_add_req is safe because interpret functions work
1818 * in check_set context. only one way exist with access to request
1819 * from different thread got -EINTR - this way protected with
1820 * cl_loi_list_lock */
1821 ptlrpc_set_add_req(set, new_req);
1823 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1825 DEBUG_REQ(D_INFO, new_req, "new request");
1830 * ugh, we want disk allocation on the target to happen in offset order. we'll
1831 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1832 * fine for our small page arrays and doesn't require allocation. its an
1833 * insertion sort that swaps elements that are strides apart, shrinking the
1834 * stride down until its '1' and the array is sorted.
1836 static void sort_brw_pages(struct brw_page **array, int num)
1839 struct brw_page *tmp;
1843 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1848 for (i = stride ; i < num ; i++) {
1851 while (j >= stride && array[j - stride]->off > tmp->off) {
1852 array[j] = array[j - stride];
1857 } while (stride > 1);
1860 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1866 LASSERT (pages > 0);
1867 offset = pg[i]->off & ~CFS_PAGE_MASK;
1871 if (pages == 0) /* that's all */
1874 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1875 return count; /* doesn't end on page boundary */
1878 offset = pg[i]->off & ~CFS_PAGE_MASK;
1879 if (offset != 0) /* doesn't start on page boundary */
1886 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1888 struct brw_page **ppga;
1891 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1895 for (i = 0; i < count; i++)
1900 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1902 LASSERT(ppga != NULL);
1903 OBD_FREE(ppga, sizeof(*ppga) * count);
1906 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1907 obd_count page_count, struct brw_page *pga,
1908 struct obd_trans_info *oti)
1910 struct obdo *saved_oa = NULL;
1911 struct brw_page **ppga, **orig;
1912 struct obd_import *imp = class_exp2cliimp(exp);
1913 struct client_obd *cli;
1914 int rc, page_count_orig;
1917 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1918 cli = &imp->imp_obd->u.cli;
1920 if (cmd & OBD_BRW_CHECK) {
1921 /* The caller just wants to know if there's a chance that this
1922 * I/O can succeed */
1924 if (imp->imp_invalid)
1929 /* test_brw with a failed create can trip this, maybe others. */
1930 LASSERT(cli->cl_max_pages_per_rpc);
1934 orig = ppga = osc_build_ppga(pga, page_count);
1937 page_count_orig = page_count;
1939 sort_brw_pages(ppga, page_count);
1940 while (page_count) {
1941 obd_count pages_per_brw;
1943 if (page_count > cli->cl_max_pages_per_rpc)
1944 pages_per_brw = cli->cl_max_pages_per_rpc;
1946 pages_per_brw = page_count;
1948 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1950 if (saved_oa != NULL) {
1951 /* restore previously saved oa */
1952 *oinfo->oi_oa = *saved_oa;
1953 } else if (page_count > pages_per_brw) {
1954 /* save a copy of oa (brw will clobber it) */
1955 OBDO_ALLOC(saved_oa);
1956 if (saved_oa == NULL)
1957 GOTO(out, rc = -ENOMEM);
1958 *saved_oa = *oinfo->oi_oa;
1961 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1962 pages_per_brw, ppga, oinfo->oi_capa);
1967 page_count -= pages_per_brw;
1968 ppga += pages_per_brw;
1972 osc_release_ppga(orig, page_count_orig);
1974 if (saved_oa != NULL)
1975 OBDO_FREE(saved_oa);
1980 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1981 * the dirty accounting. Writeback completes or truncate happens before
1982 * writing starts. Must be called with the loi lock held. */
1983 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1986 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1990 /* This maintains the lists of pending pages to read/write for a given object
1991 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1992 * to quickly find objects that are ready to send an RPC. */
1993 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1998 if (lop->lop_num_pending == 0)
2001 /* if we have an invalid import we want to drain the queued pages
2002 * by forcing them through rpcs that immediately fail and complete
2003 * the pages. recovery relies on this to empty the queued pages
2004 * before canceling the locks and evicting down the llite pages */
2005 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2008 /* stream rpcs in queue order as long as as there is an urgent page
2009 * queued. this is our cheap solution for good batching in the case
2010 * where writepage marks some random page in the middle of the file
2011 * as urgent because of, say, memory pressure */
2012 if (!cfs_list_empty(&lop->lop_urgent)) {
2013 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2017 if (cmd & OBD_BRW_WRITE) {
2018 /* trigger a write rpc stream as long as there are dirtiers
2019 * waiting for space. as they're waiting, they're not going to
2020 * create more pages to coalesce with what's waiting.. */
2021 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2022 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2026 if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2032 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2034 struct osc_async_page *oap;
2037 if (cfs_list_empty(&lop->lop_urgent))
2040 oap = cfs_list_entry(lop->lop_urgent.next,
2041 struct osc_async_page, oap_urgent_item);
2043 if (oap->oap_async_flags & ASYNC_HP) {
2044 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2051 static void on_list(cfs_list_t *item, cfs_list_t *list,
2054 if (cfs_list_empty(item) && should_be_on)
2055 cfs_list_add_tail(item, list);
2056 else if (!cfs_list_empty(item) && !should_be_on)
2057 cfs_list_del_init(item);
2060 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2061 * can find pages to build into rpcs quickly */
2062 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2064 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2065 lop_makes_hprpc(&loi->loi_read_lop)) {
2067 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2068 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2070 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2071 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2072 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2073 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2076 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2077 loi->loi_write_lop.lop_num_pending);
2079 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2080 loi->loi_read_lop.lop_num_pending);
2083 static void lop_update_pending(struct client_obd *cli,
2084 struct loi_oap_pages *lop, int cmd, int delta)
2086 lop->lop_num_pending += delta;
2087 if (cmd & OBD_BRW_WRITE)
2088 cli->cl_pending_w_pages += delta;
2090 cli->cl_pending_r_pages += delta;
2094 * this is called when a sync waiter receives an interruption. Its job is to
2095 * get the caller woken as soon as possible. If its page hasn't been put in an
2096 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2097 * desiring interruption which will forcefully complete the rpc once the rpc
2100 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2102 struct loi_oap_pages *lop;
2103 struct lov_oinfo *loi;
2107 LASSERT(!oap->oap_interrupted);
2108 oap->oap_interrupted = 1;
2110 /* ok, it's been put in an rpc. only one oap gets a request reference */
2111 if (oap->oap_request != NULL) {
2112 ptlrpc_mark_interrupted(oap->oap_request);
2113 ptlrpcd_wake(oap->oap_request);
2114 ptlrpc_req_finished(oap->oap_request);
2115 oap->oap_request = NULL;
2119 * page completion may be called only if ->cpo_prep() method was
2120 * executed by osc_io_submit(), that also adds page the to pending list
2122 if (!cfs_list_empty(&oap->oap_pending_item)) {
2123 cfs_list_del_init(&oap->oap_pending_item);
2124 cfs_list_del_init(&oap->oap_urgent_item);
2127 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2128 &loi->loi_write_lop : &loi->loi_read_lop;
2129 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2130 loi_list_maint(oap->oap_cli, oap->oap_loi);
2131 rc = oap->oap_caller_ops->ap_completion(env,
2132 oap->oap_caller_data,
2133 oap->oap_cmd, NULL, -EINTR);
2139 /* this is trying to propogate async writeback errors back up to the
2140 * application. As an async write fails we record the error code for later if
2141 * the app does an fsync. As long as errors persist we force future rpcs to be
2142 * sync so that the app can get a sync error and break the cycle of queueing
2143 * pages for which writeback will fail. */
2144 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2151 ar->ar_force_sync = 1;
2152 ar->ar_min_xid = ptlrpc_sample_next_xid();
2157 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2158 ar->ar_force_sync = 0;
2161 void osc_oap_to_pending(struct osc_async_page *oap)
2163 struct loi_oap_pages *lop;
2165 if (oap->oap_cmd & OBD_BRW_WRITE)
2166 lop = &oap->oap_loi->loi_write_lop;
2168 lop = &oap->oap_loi->loi_read_lop;
2170 if (oap->oap_async_flags & ASYNC_HP)
2171 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2172 else if (oap->oap_async_flags & ASYNC_URGENT)
2173 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2174 cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2175 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2178 /* this must be called holding the loi list lock to give coverage to exit_cache,
2179 * async_flag maintenance, and oap_request */
2180 static void osc_ap_completion(const struct lu_env *env,
2181 struct client_obd *cli, struct obdo *oa,
2182 struct osc_async_page *oap, int sent, int rc)
2187 if (oap->oap_request != NULL) {
2188 xid = ptlrpc_req_xid(oap->oap_request);
2189 ptlrpc_req_finished(oap->oap_request);
2190 oap->oap_request = NULL;
2193 cfs_spin_lock(&oap->oap_lock);
2194 oap->oap_async_flags = 0;
2195 cfs_spin_unlock(&oap->oap_lock);
2196 oap->oap_interrupted = 0;
2198 if (oap->oap_cmd & OBD_BRW_WRITE) {
2199 osc_process_ar(&cli->cl_ar, xid, rc);
2200 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2203 if (rc == 0 && oa != NULL) {
2204 if (oa->o_valid & OBD_MD_FLBLOCKS)
2205 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2206 if (oa->o_valid & OBD_MD_FLMTIME)
2207 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2208 if (oa->o_valid & OBD_MD_FLATIME)
2209 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2210 if (oa->o_valid & OBD_MD_FLCTIME)
2211 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2214 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2215 oap->oap_cmd, oa, rc);
2217 /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2218 * start, but OSC calls it under lock and thus we can add oap back to
2221 /* upper layer wants to leave the page on pending queue */
2222 osc_oap_to_pending(oap);
2224 osc_exit_cache(cli, oap, sent);
2228 static int brw_queue_work(const struct lu_env *env, void *data)
2230 struct client_obd *cli = data;
2232 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2234 client_obd_list_lock(&cli->cl_loi_list_lock);
2235 osc_check_rpcs0(env, cli, 1);
2236 client_obd_list_unlock(&cli->cl_loi_list_lock);
2240 static int brw_interpret(const struct lu_env *env,
2241 struct ptlrpc_request *req, void *data, int rc)
2243 struct osc_brw_async_args *aa = data;
2244 struct client_obd *cli;
2248 rc = osc_brw_fini_request(req, rc);
2249 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2250 /* When server return -EINPROGRESS, client should always retry
2251 * regardless of the number of times the bulk was resent already. */
2252 if (osc_recoverable_error(rc)) {
2253 if (req->rq_import_generation !=
2254 req->rq_import->imp_generation) {
2255 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2256 ""LPU64":"LPU64", rc = %d.\n",
2257 req->rq_import->imp_obd->obd_name,
2258 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2259 } else if (rc == -EINPROGRESS ||
2260 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2261 rc = osc_brw_redo_request(req, aa);
2263 CERROR("%s: too many resent retries for object: "
2264 ""LPU64":"LPU64", rc = %d.\n",
2265 req->rq_import->imp_obd->obd_name,
2266 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2271 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2276 capa_put(aa->aa_ocapa);
2277 aa->aa_ocapa = NULL;
2281 client_obd_list_lock(&cli->cl_loi_list_lock);
2283 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2284 * is called so we know whether to go to sync BRWs or wait for more
2285 * RPCs to complete */
2286 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2287 cli->cl_w_in_flight--;
2289 cli->cl_r_in_flight--;
2291 async = cfs_list_empty(&aa->aa_oaps);
2292 if (!async) { /* from osc_send_oap_rpc() */
2293 struct osc_async_page *oap, *tmp;
2294 /* the caller may re-use the oap after the completion call so
2295 * we need to clean it up a little */
2296 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2298 cfs_list_del_init(&oap->oap_rpc_item);
2299 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2301 OBDO_FREE(aa->aa_oa);
2302 } else { /* from async_internal() */
2304 for (i = 0; i < aa->aa_page_count; i++)
2305 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2307 osc_wake_cache_waiters(cli);
2308 osc_check_rpcs0(env, cli, 1);
2309 client_obd_list_unlock(&cli->cl_loi_list_lock);
2312 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2313 req->rq_bulk->bd_nob_transferred);
2314 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2315 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2320 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2321 struct client_obd *cli,
2322 cfs_list_t *rpc_list,
2323 int page_count, int cmd)
2325 struct ptlrpc_request *req;
2326 struct brw_page **pga = NULL;
2327 struct osc_brw_async_args *aa;
2328 struct obdo *oa = NULL;
2329 const struct obd_async_page_ops *ops = NULL;
2330 struct osc_async_page *oap;
2331 struct osc_async_page *tmp;
2332 struct cl_req *clerq = NULL;
2333 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2334 struct ldlm_lock *lock = NULL;
2335 struct cl_req_attr crattr;
2336 int i, rc, mpflag = 0;
2339 LASSERT(!cfs_list_empty(rpc_list));
2341 if (cmd & OBD_BRW_MEMALLOC)
2342 mpflag = cfs_memory_pressure_get_and_set();
2344 memset(&crattr, 0, sizeof crattr);
2345 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2347 GOTO(out, req = ERR_PTR(-ENOMEM));
2351 GOTO(out, req = ERR_PTR(-ENOMEM));
2354 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2355 struct cl_page *page = osc_oap2cl_page(oap);
2357 ops = oap->oap_caller_ops;
2359 clerq = cl_req_alloc(env, page, crt,
2360 1 /* only 1-object rpcs for
2363 GOTO(out, req = (void *)clerq);
2364 lock = oap->oap_ldlm_lock;
2366 pga[i] = &oap->oap_brw_page;
2367 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2368 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2369 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2371 cl_req_page_add(env, clerq, page);
2374 /* always get the data for the obdo for the rpc */
2375 LASSERT(ops != NULL);
2377 crattr.cra_capa = NULL;
2378 memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2379 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2381 oa->o_handle = lock->l_remote_handle;
2382 oa->o_valid |= OBD_MD_FLHANDLE;
2385 rc = cl_req_prep(env, clerq);
2387 CERROR("cl_req_prep failed: %d\n", rc);
2388 GOTO(out, req = ERR_PTR(rc));
2391 sort_brw_pages(pga, page_count);
2392 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2393 pga, &req, crattr.cra_capa, 1, 0);
2395 CERROR("prep_req failed: %d\n", rc);
2396 GOTO(out, req = ERR_PTR(rc));
2399 if (cmd & OBD_BRW_MEMALLOC)
2400 req->rq_memalloc = 1;
2402 /* Need to update the timestamps after the request is built in case
2403 * we race with setattr (locally or in queue at OST). If OST gets
2404 * later setattr before earlier BRW (as determined by the request xid),
2405 * the OST will not use BRW timestamps. Sadly, there is no obvious
2406 * way to do this in a single call. bug 10150 */
2407 cl_req_attr_set(env, clerq, &crattr,
2408 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2410 lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2412 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2413 aa = ptlrpc_req_async_args(req);
2414 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2415 cfs_list_splice(rpc_list, &aa->aa_oaps);
2416 CFS_INIT_LIST_HEAD(rpc_list);
2417 aa->aa_clerq = clerq;
2419 if (cmd & OBD_BRW_MEMALLOC)
2420 cfs_memory_pressure_restore(mpflag);
2422 capa_put(crattr.cra_capa);
2427 OBD_FREE(pga, sizeof(*pga) * page_count);
2428 /* this should happen rarely and is pretty bad, it makes the
2429 * pending list not follow the dirty order */
2430 client_obd_list_lock(&cli->cl_loi_list_lock);
2431 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2432 cfs_list_del_init(&oap->oap_rpc_item);
2434 /* queued sync pages can be torn down while the pages
2435 * were between the pending list and the rpc */
2436 if (oap->oap_interrupted) {
2437 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2438 osc_ap_completion(env, cli, NULL, oap, 0,
2442 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2444 if (clerq && !IS_ERR(clerq))
2445 cl_req_completion(env, clerq, PTR_ERR(req));
2451 * prepare pages for ASYNC io and put pages in send queue.
2453 * \param cmd OBD_BRW_* macroses
2454 * \param lop pending pages
2456 * \return zero if no page added to send queue.
2457 * \return 1 if pages successfully added to send queue.
2458 * \return negative on errors.
2461 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2462 struct lov_oinfo *loi, int cmd,
2463 struct loi_oap_pages *lop, pdl_policy_t pol)
2465 struct ptlrpc_request *req;
2466 obd_count page_count = 0;
2467 struct osc_async_page *oap = NULL, *tmp;
2468 struct osc_brw_async_args *aa;
2469 const struct obd_async_page_ops *ops;
2470 CFS_LIST_HEAD(rpc_list);
2471 int srvlock = 0, mem_tight = 0;
2472 struct cl_object *clob = NULL;
2473 obd_off starting_offset = OBD_OBJECT_EOF;
2474 unsigned int ending_offset;
2475 int starting_page_off = 0;
2478 /* ASYNC_HP pages first. At present, when the lock the pages is
2479 * to be canceled, the pages covered by the lock will be sent out
2480 * with ASYNC_HP. We have to send out them as soon as possible. */
2481 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2482 if (oap->oap_async_flags & ASYNC_HP)
2483 cfs_list_move(&oap->oap_pending_item, &rpc_list);
2484 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2485 /* only do this for writeback pages. */
2486 cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2487 if (++page_count >= cli->cl_max_pages_per_rpc)
2490 cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2493 /* first we find the pages we're allowed to work with */
2494 cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2496 ops = oap->oap_caller_ops;
2498 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2499 "magic 0x%x\n", oap, oap->oap_magic);
2502 /* pin object in memory, so that completion call-backs
2503 * can be safely called under client_obd_list lock. */
2504 clob = osc_oap2cl_page(oap)->cp_obj;
2505 cl_object_get(clob);
2508 if (page_count != 0 &&
2509 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2510 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2511 " oap %p, page %p, srvlock %u\n",
2512 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2516 /* If there is a gap at the start of this page, it can't merge
2517 * with any previous page, so we'll hand the network a
2518 * "fragmented" page array that it can't transfer in 1 RDMA */
2519 if (oap->oap_obj_off < starting_offset) {
2520 if (starting_page_off != 0)
2523 starting_page_off = oap->oap_page_off;
2524 starting_offset = oap->oap_obj_off + starting_page_off;
2525 } else if (oap->oap_page_off != 0)
2528 /* in llite being 'ready' equates to the page being locked
2529 * until completion unlocks it. commit_write submits a page
2530 * as not ready because its unlock will happen unconditionally
2531 * as the call returns. if we race with commit_write giving
2532 * us that page we don't want to create a hole in the page
2533 * stream, so we stop and leave the rpc to be fired by
2534 * another dirtier or kupdated interval (the not ready page
2535 * will still be on the dirty list). we could call in
2536 * at the end of ll_file_write to process the queue again. */
2537 if (!(oap->oap_async_flags & ASYNC_READY)) {
2538 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2541 CDEBUG(D_INODE, "oap %p page %p returned %d "
2542 "instead of ready\n", oap,
2546 /* llite is telling us that the page is still
2547 * in commit_write and that we should try
2548 * and put it in an rpc again later. we
2549 * break out of the loop so we don't create
2550 * a hole in the sequence of pages in the rpc
2555 /* the io isn't needed.. tell the checks
2556 * below to complete the rpc with EINTR */
2557 cfs_spin_lock(&oap->oap_lock);
2558 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2559 cfs_spin_unlock(&oap->oap_lock);
2560 oap->oap_count = -EINTR;
2563 cfs_spin_lock(&oap->oap_lock);
2564 oap->oap_async_flags |= ASYNC_READY;
2565 cfs_spin_unlock(&oap->oap_lock);
2568 LASSERTF(0, "oap %p page %p returned %d "
2569 "from make_ready\n", oap,
2577 /* take the page out of our book-keeping */
2578 cfs_list_del_init(&oap->oap_pending_item);
2579 lop_update_pending(cli, lop, cmd, -1);
2580 cfs_list_del_init(&oap->oap_urgent_item);
2582 /* ask the caller for the size of the io as the rpc leaves. */
2583 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2585 ops->ap_refresh_count(env, oap->oap_caller_data,
2587 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2589 if (oap->oap_count <= 0) {
2590 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2592 osc_ap_completion(env, cli, NULL,
2593 oap, 0, oap->oap_count);
2597 /* now put the page back in our accounting */
2598 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2599 if (page_count++ == 0)
2600 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2602 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2605 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2606 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2607 * have the same alignment as the initial writes that allocated
2608 * extents on the server. */
2609 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2611 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2614 if (page_count >= cli->cl_max_pages_per_rpc)
2617 /* If there is a gap at the end of this page, it can't merge
2618 * with any subsequent pages, so we'll hand the network a
2619 * "fragmented" page array that it can't transfer in 1 RDMA */
2620 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2624 loi_list_maint(cli, loi);
2626 client_obd_list_unlock(&cli->cl_loi_list_lock);
2629 cl_object_put(env, clob);
2631 if (page_count == 0) {
2632 client_obd_list_lock(&cli->cl_loi_list_lock);
2636 req = osc_build_req(env, cli, &rpc_list, page_count,
2637 mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2639 LASSERT(cfs_list_empty(&rpc_list));
2640 loi_list_maint(cli, loi);
2641 RETURN(PTR_ERR(req));
2644 aa = ptlrpc_req_async_args(req);
2646 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2647 if (cmd == OBD_BRW_READ) {
2648 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2649 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2650 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2651 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2653 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2654 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2655 cli->cl_w_in_flight);
2656 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2657 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2660 client_obd_list_lock(&cli->cl_loi_list_lock);
2662 if (cmd == OBD_BRW_READ)
2663 cli->cl_r_in_flight++;
2665 cli->cl_w_in_flight++;
2667 /* queued sync pages can be torn down while the pages
2668 * were between the pending list and the rpc */
2670 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2671 /* only one oap gets a request reference */
2674 if (oap->oap_interrupted && !req->rq_intr) {
2675 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2677 ptlrpc_mark_interrupted(req);
2681 tmp->oap_request = ptlrpc_request_addref(req);
2683 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2684 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2686 req->rq_interpret_reply = brw_interpret;
2688 /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2689 * CPU/NUMA node the majority of pages were allocated on, and try
2690 * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2691 * to reduce cross-CPU memory traffic.
2693 * But on the other hand, we expect that multiple ptlrpcd threads
2694 * and the initial write sponsor can run in parallel, especially
2695 * when data checksum is enabled, which is CPU-bound operation and
2696 * single ptlrpcd thread cannot process in time. So more ptlrpcd
2697 * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2699 ptlrpcd_add_req(req, pol, -1);
2703 #define LOI_DEBUG(LOI, STR, args...) \
2704 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2705 !cfs_list_empty(&(LOI)->loi_ready_item) || \
2706 !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
2707 (LOI)->loi_write_lop.lop_num_pending, \
2708 !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2709 (LOI)->loi_read_lop.lop_num_pending, \
2710 !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2713 /* This is called by osc_check_rpcs() to find which objects have pages that
2714 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2715 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2719 /* First return objects that have blocked locks so that they
2720 * will be flushed quickly and other clients can get the lock,
2721 * then objects which have pages ready to be stuffed into RPCs */
2722 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2723 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2724 struct lov_oinfo, loi_hp_ready_item));
2725 if (!cfs_list_empty(&cli->cl_loi_ready_list))
2726 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2727 struct lov_oinfo, loi_ready_item));
2729 /* then if we have cache waiters, return all objects with queued
2730 * writes. This is especially important when many small files
2731 * have filled up the cache and not been fired into rpcs because
2732 * they don't pass the nr_pending/object threshhold */
2733 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2734 !cfs_list_empty(&cli->cl_loi_write_list))
2735 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2736 struct lov_oinfo, loi_write_item));
2738 /* then return all queued objects when we have an invalid import
2739 * so that they get flushed */
2740 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2741 if (!cfs_list_empty(&cli->cl_loi_write_list))
2742 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2745 if (!cfs_list_empty(&cli->cl_loi_read_list))
2746 RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2747 struct lov_oinfo, loi_read_item));
2752 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2754 struct osc_async_page *oap;
2757 if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2758 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2759 struct osc_async_page, oap_urgent_item);
2760 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2763 if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2764 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2765 struct osc_async_page, oap_urgent_item);
2766 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2769 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2772 /* called with the loi list lock held */
2773 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2775 struct lov_oinfo *loi;
2776 int rc = 0, race_counter = 0;
2780 pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2782 while ((loi = osc_next_loi(cli)) != NULL) {
2783 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2785 if (osc_max_rpc_in_flight(cli, loi))
2788 /* attempt some read/write balancing by alternating between
2789 * reads and writes in an object. The makes_rpc checks here
2790 * would be redundant if we were getting read/write work items
2791 * instead of objects. we don't want send_oap_rpc to drain a
2792 * partial read pending queue when we're given this object to
2793 * do io on writes while there are cache waiters */
2794 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2795 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2796 &loi->loi_write_lop, pol);
2798 CERROR("Write request failed with %d\n", rc);
2800 /* osc_send_oap_rpc failed, mostly because of
2803 * It can't break here, because if:
2804 * - a page was submitted by osc_io_submit, so
2806 * - no request in flight
2807 * - no subsequent request
2808 * The system will be in live-lock state,
2809 * because there is no chance to call
2810 * osc_io_unplug() and osc_check_rpcs() any
2811 * more. pdflush can't help in this case,
2812 * because it might be blocked at grabbing
2813 * the page lock as we mentioned.
2815 * Anyway, continue to drain pages. */
2824 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2825 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2826 &loi->loi_read_lop, pol);
2828 CERROR("Read request failed with %d\n", rc);
2836 /* attempt some inter-object balancing by issuing rpcs
2837 * for each object in turn */
2838 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2839 cfs_list_del_init(&loi->loi_hp_ready_item);
2840 if (!cfs_list_empty(&loi->loi_ready_item))
2841 cfs_list_del_init(&loi->loi_ready_item);
2842 if (!cfs_list_empty(&loi->loi_write_item))
2843 cfs_list_del_init(&loi->loi_write_item);
2844 if (!cfs_list_empty(&loi->loi_read_item))
2845 cfs_list_del_init(&loi->loi_read_item);
2847 loi_list_maint(cli, loi);
2849 /* send_oap_rpc fails with 0 when make_ready tells it to
2850 * back off. llite's make_ready does this when it tries
2851 * to lock a page queued for write that is already locked.
2852 * we want to try sending rpcs from many objects, but we
2853 * don't want to spin failing with 0. */
2854 if (race_counter == 10)
2859 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2861 osc_check_rpcs0(env, cli, 0);
2865 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2868 int osc_enter_cache_try(const struct lu_env *env,
2869 struct client_obd *cli, struct lov_oinfo *loi,
2870 struct osc_async_page *oap, int transient)
2874 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2876 osc_consume_write_grant(cli, &oap->oap_brw_page);
2878 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2879 cfs_atomic_inc(&obd_dirty_transit_pages);
2880 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2886 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2887 * grant or cache space. */
2888 static int osc_enter_cache(const struct lu_env *env,
2889 struct client_obd *cli, struct lov_oinfo *loi,
2890 struct osc_async_page *oap)
2892 struct osc_cache_waiter ocw;
2893 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2897 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2898 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2899 cli->cl_dirty_max, obd_max_dirty_pages,
2900 cli->cl_lost_grant, cli->cl_avail_grant);
2902 /* force the caller to try sync io. this can jump the list
2903 * of queued writes and create a discontiguous rpc stream */
2904 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2905 cli->cl_dirty_max < CFS_PAGE_SIZE ||
2906 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2909 /* Hopefully normal case - cache space and write credits available */
2910 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2911 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2912 osc_enter_cache_try(env, cli, loi, oap, 0))
2915 /* We can get here for two reasons: too many dirty pages in cache, or
2916 * run out of grants. In both cases we should write dirty pages out.
2917 * Adding a cache waiter will trigger urgent write-out no matter what
2919 * The exiting condition is no avail grants and no dirty pages caching,
2920 * that really means there is no space on the OST. */
2921 cfs_waitq_init(&ocw.ocw_waitq);
2923 while (cli->cl_dirty > 0) {
2924 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2927 loi_list_maint(cli, loi);
2928 osc_check_rpcs(env, cli);
2929 client_obd_list_unlock(&cli->cl_loi_list_lock);
2931 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
2932 cli->cl_import->imp_obd->obd_name, &ocw, oap);
2934 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
2936 client_obd_list_lock(&cli->cl_loi_list_lock);
2937 cfs_list_del_init(&ocw.ocw_entry);
2950 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2951 struct lov_oinfo *loi, cfs_page_t *page,
2952 obd_off offset, const struct obd_async_page_ops *ops,
2953 void *data, void **res, int nocache,
2954 struct lustre_handle *lockh)
2956 struct osc_async_page *oap;
2961 return cfs_size_round(sizeof(*oap));
2964 oap->oap_magic = OAP_MAGIC;
2965 oap->oap_cli = &exp->exp_obd->u.cli;
2968 oap->oap_caller_ops = ops;
2969 oap->oap_caller_data = data;
2971 oap->oap_page = page;
2972 oap->oap_obj_off = offset;
2973 if (!client_is_remote(exp) &&
2974 cfs_capable(CFS_CAP_SYS_RESOURCE))
2975 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2977 LASSERT(!(offset & ~CFS_PAGE_MASK));
2979 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2980 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2981 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2982 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2984 cfs_spin_lock_init(&oap->oap_lock);
2985 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2989 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2990 struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2991 struct osc_async_page *oap, int cmd, int off,
2992 int count, obd_flag brw_flags, enum async_flags async_flags)
2994 struct client_obd *cli = &exp->exp_obd->u.cli;
2998 if (oap->oap_magic != OAP_MAGIC)
3001 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3004 if (!cfs_list_empty(&oap->oap_pending_item) ||
3005 !cfs_list_empty(&oap->oap_urgent_item) ||
3006 !cfs_list_empty(&oap->oap_rpc_item))
3009 /* check if the file's owner/group is over quota */
3010 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3011 struct cl_object *obj;
3012 struct cl_attr attr; /* XXX put attr into thread info */
3013 unsigned int qid[MAXQUOTAS];
3015 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3017 cl_object_attr_lock(obj);
3018 rc = cl_object_attr_get(env, obj, &attr);
3019 cl_object_attr_unlock(obj);
3021 qid[USRQUOTA] = attr.cat_uid;
3022 qid[GRPQUOTA] = attr.cat_gid;
3024 osc_quota_chkdq(cli, qid) == NO_QUOTA)
3031 loi = lsm->lsm_oinfo[0];
3033 client_obd_list_lock(&cli->cl_loi_list_lock);
3035 LASSERT(off + count <= CFS_PAGE_SIZE);
3037 oap->oap_page_off = off;
3038 oap->oap_count = count;
3039 oap->oap_brw_flags = brw_flags;
3040 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3041 if (cfs_memory_pressure_get())
3042 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3043 cfs_spin_lock(&oap->oap_lock);
3044 oap->oap_async_flags = async_flags;
3045 cfs_spin_unlock(&oap->oap_lock);
3047 if (cmd & OBD_BRW_WRITE) {
3048 rc = osc_enter_cache(env, cli, loi, oap);
3050 client_obd_list_unlock(&cli->cl_loi_list_lock);
3055 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3058 osc_oap_to_pending(oap);
3059 loi_list_maint(cli, loi);
3060 if (!osc_max_rpc_in_flight(cli, loi) &&
3061 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3062 LASSERT(cli->cl_writeback_work != NULL);
3063 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3065 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3068 client_obd_list_unlock(&cli->cl_loi_list_lock);
3073 /* aka (~was & now & flag), but this is more clear :) */
3074 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3076 int osc_set_async_flags_base(struct client_obd *cli,
3077 struct lov_oinfo *loi, struct osc_async_page *oap,
3078 obd_flag async_flags)
3080 struct loi_oap_pages *lop;
3084 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3086 if (oap->oap_cmd & OBD_BRW_WRITE) {
3087 lop = &loi->loi_write_lop;
3089 lop = &loi->loi_read_lop;
3092 if ((oap->oap_async_flags & async_flags) == async_flags)
3095 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3096 flags |= ASYNC_READY;
3098 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3099 cfs_list_empty(&oap->oap_rpc_item)) {
3100 if (oap->oap_async_flags & ASYNC_HP)
3101 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3103 cfs_list_add_tail(&oap->oap_urgent_item,
3105 flags |= ASYNC_URGENT;
3106 loi_list_maint(cli, loi);
3108 cfs_spin_lock(&oap->oap_lock);
3109 oap->oap_async_flags |= flags;
3110 cfs_spin_unlock(&oap->oap_lock);
3112 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3113 oap->oap_async_flags);
3117 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3118 struct lov_oinfo *loi, struct osc_async_page *oap)
3120 struct client_obd *cli = &exp->exp_obd->u.cli;
3121 struct loi_oap_pages *lop;
3125 if (oap->oap_magic != OAP_MAGIC)
3129 loi = lsm->lsm_oinfo[0];
3131 if (oap->oap_cmd & OBD_BRW_WRITE) {
3132 lop = &loi->loi_write_lop;
3134 lop = &loi->loi_read_lop;
3137 client_obd_list_lock(&cli->cl_loi_list_lock);
3139 if (!cfs_list_empty(&oap->oap_rpc_item))
3140 GOTO(out, rc = -EBUSY);
3142 osc_exit_cache(cli, oap, 0);
3143 osc_wake_cache_waiters(cli);
3145 if (!cfs_list_empty(&oap->oap_urgent_item)) {
3146 cfs_list_del_init(&oap->oap_urgent_item);
3147 cfs_spin_lock(&oap->oap_lock);
3148 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3149 cfs_spin_unlock(&oap->oap_lock);
3151 if (!cfs_list_empty(&oap->oap_pending_item)) {
3152 cfs_list_del_init(&oap->oap_pending_item);
3153 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3155 loi_list_maint(cli, loi);
3156 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3158 client_obd_list_unlock(&cli->cl_loi_list_lock);
3162 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3163 struct ldlm_enqueue_info *einfo)
3165 void *data = einfo->ei_cbdata;
3168 LASSERT(lock != NULL);
3169 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3170 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3171 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3172 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3174 lock_res_and_lock(lock);
3175 cfs_spin_lock(&osc_ast_guard);
3177 if (lock->l_ast_data == NULL)
3178 lock->l_ast_data = data;
3179 if (lock->l_ast_data == data)
3182 cfs_spin_unlock(&osc_ast_guard);
3183 unlock_res_and_lock(lock);
3188 static int osc_set_data_with_check(struct lustre_handle *lockh,
3189 struct ldlm_enqueue_info *einfo)
3191 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3195 set = osc_set_lock_data_with_check(lock, einfo);
3196 LDLM_LOCK_PUT(lock);
3198 CERROR("lockh %p, data %p - client evicted?\n",
3199 lockh, einfo->ei_cbdata);
3203 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3204 ldlm_iterator_t replace, void *data)
3206 struct ldlm_res_id res_id;
3207 struct obd_device *obd = class_exp2obd(exp);
3209 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3210 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3214 /* find any ldlm lock of the inode in osc
3218 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3219 ldlm_iterator_t replace, void *data)
3221 struct ldlm_res_id res_id;
3222 struct obd_device *obd = class_exp2obd(exp);
3225 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3226 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3227 if (rc == LDLM_ITER_STOP)
3229 if (rc == LDLM_ITER_CONTINUE)
3234 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3235 obd_enqueue_update_f upcall, void *cookie,
3236 int *flags, int agl, int rc)
3238 int intent = *flags & LDLM_FL_HAS_INTENT;
3242 /* The request was created before ldlm_cli_enqueue call. */
3243 if (rc == ELDLM_LOCK_ABORTED) {
3244 struct ldlm_reply *rep;
3245 rep = req_capsule_server_get(&req->rq_pill,
3248 LASSERT(rep != NULL);
3249 if (rep->lock_policy_res1)
3250 rc = rep->lock_policy_res1;
3254 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3256 *flags |= LDLM_FL_LVB_READY;
3257 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3258 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3261 /* Call the update callback. */
3262 rc = (*upcall)(cookie, rc);
3266 static int osc_enqueue_interpret(const struct lu_env *env,
3267 struct ptlrpc_request *req,
3268 struct osc_enqueue_args *aa, int rc)
3270 struct ldlm_lock *lock;
3271 struct lustre_handle handle;
3273 struct ost_lvb *lvb;
3275 int *flags = aa->oa_flags;
3277 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3278 * might be freed anytime after lock upcall has been called. */
3279 lustre_handle_copy(&handle, aa->oa_lockh);
3280 mode = aa->oa_ei->ei_mode;
3282 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3284 lock = ldlm_handle2lock(&handle);
3286 /* Take an additional reference so that a blocking AST that
3287 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3288 * to arrive after an upcall has been executed by
3289 * osc_enqueue_fini(). */
3290 ldlm_lock_addref(&handle, mode);
3292 /* Let CP AST to grant the lock first. */
3293 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3295 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3300 lvb_len = sizeof(*aa->oa_lvb);
3303 /* Complete obtaining the lock procedure. */
3304 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3305 mode, flags, lvb, lvb_len, &handle, rc);
3306 /* Complete osc stuff. */
3307 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3308 flags, aa->oa_agl, rc);
3310 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3312 /* Release the lock for async request. */
3313 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3315 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3316 * not already released by
3317 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3319 ldlm_lock_decref(&handle, mode);
3321 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3322 aa->oa_lockh, req, aa);
3323 ldlm_lock_decref(&handle, mode);
3324 LDLM_LOCK_PUT(lock);
3328 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3329 struct lov_oinfo *loi, int flags,
3330 struct ost_lvb *lvb, __u32 mode, int rc)
3332 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3334 if (rc == ELDLM_OK) {
3337 LASSERT(lock != NULL);
3338 loi->loi_lvb = *lvb;
3339 tmp = loi->loi_lvb.lvb_size;
3340 /* Extend KMS up to the end of this lock and no further
3341 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3342 if (tmp > lock->l_policy_data.l_extent.end)
3343 tmp = lock->l_policy_data.l_extent.end + 1;
3344 if (tmp >= loi->loi_kms) {
3345 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3346 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3347 loi_kms_set(loi, tmp);
3349 LDLM_DEBUG(lock, "lock acquired, setting rss="
3350 LPU64"; leaving kms="LPU64", end="LPU64,
3351 loi->loi_lvb.lvb_size, loi->loi_kms,
3352 lock->l_policy_data.l_extent.end);
3354 ldlm_lock_allow_match(lock);
3355 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3356 LASSERT(lock != NULL);
3357 loi->loi_lvb = *lvb;
3358 ldlm_lock_allow_match(lock);
3359 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3360 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3366 ldlm_lock_fail_match(lock);
3368 LDLM_LOCK_PUT(lock);
3371 EXPORT_SYMBOL(osc_update_enqueue);
3373 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3375 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3376 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3377 * other synchronous requests, however keeping some locks and trying to obtain
3378 * others may take a considerable amount of time in a case of ost failure; and
3379 * when other sync requests do not get released lock from a client, the client
3380 * is excluded from the cluster -- such scenarious make the life difficult, so
3381 * release locks just after they are obtained. */
3382 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3383 int *flags, ldlm_policy_data_t *policy,
3384 struct ost_lvb *lvb, int kms_valid,
3385 obd_enqueue_update_f upcall, void *cookie,
3386 struct ldlm_enqueue_info *einfo,
3387 struct lustre_handle *lockh,
3388 struct ptlrpc_request_set *rqset, int async, int agl)
3390 struct obd_device *obd = exp->exp_obd;
3391 struct ptlrpc_request *req = NULL;
3392 int intent = *flags & LDLM_FL_HAS_INTENT;
3393 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3398 /* Filesystem lock extents are extended to page boundaries so that
3399 * dealing with the page cache is a little smoother. */
3400 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3401 policy->l_extent.end |= ~CFS_PAGE_MASK;
3404 * kms is not valid when either object is completely fresh (so that no
3405 * locks are cached), or object was evicted. In the latter case cached
3406 * lock cannot be used, because it would prime inode state with
3407 * potentially stale LVB.
3412 /* Next, search for already existing extent locks that will cover us */
3413 /* If we're trying to read, we also search for an existing PW lock. The
3414 * VFS and page cache already protect us locally, so lots of readers/
3415 * writers can share a single PW lock.
3417 * There are problems with conversion deadlocks, so instead of
3418 * converting a read lock to a write lock, we'll just enqueue a new
3421 * At some point we should cancel the read lock instead of making them
3422 * send us a blocking callback, but there are problems with canceling
3423 * locks out from other users right now, too. */
3424 mode = einfo->ei_mode;
3425 if (einfo->ei_mode == LCK_PR)
3427 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3428 einfo->ei_type, policy, mode, lockh, 0);
3430 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3432 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3433 /* For AGL, if enqueue RPC is sent but the lock is not
3434 * granted, then skip to process this strpe.
3435 * Return -ECANCELED to tell the caller. */
3436 ldlm_lock_decref(lockh, mode);
3437 LDLM_LOCK_PUT(matched);
3439 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3440 *flags |= LDLM_FL_LVB_READY;
3441 /* addref the lock only if not async requests and PW
3442 * lock is matched whereas we asked for PR. */
3443 if (!rqset && einfo->ei_mode != mode)
3444 ldlm_lock_addref(lockh, LCK_PR);
3446 /* I would like to be able to ASSERT here that
3447 * rss <= kms, but I can't, for reasons which
3448 * are explained in lov_enqueue() */
3451 /* We already have a lock, and it's referenced */
3452 (*upcall)(cookie, ELDLM_OK);
3454 if (einfo->ei_mode != mode)
3455 ldlm_lock_decref(lockh, LCK_PW);
3457 /* For async requests, decref the lock. */
3458 ldlm_lock_decref(lockh, einfo->ei_mode);
3459 LDLM_LOCK_PUT(matched);
3462 ldlm_lock_decref(lockh, mode);
3463 LDLM_LOCK_PUT(matched);
3469 CFS_LIST_HEAD(cancels);
3470 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3471 &RQF_LDLM_ENQUEUE_LVB);
3475 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3477 ptlrpc_request_free(req);
3481 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3483 ptlrpc_request_set_replen(req);
3486 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3487 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3489 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3490 sizeof(*lvb), lockh, async);
3493 struct osc_enqueue_args *aa;
3494 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3495 aa = ptlrpc_req_async_args(req);
3498 aa->oa_flags = flags;
3499 aa->oa_upcall = upcall;
3500 aa->oa_cookie = cookie;
3502 aa->oa_lockh = lockh;
3505 req->rq_interpret_reply =
3506 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3507 if (rqset == PTLRPCD_SET)
3508 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3510 ptlrpc_set_add_req(rqset, req);
3511 } else if (intent) {
3512 ptlrpc_req_finished(req);
3517 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3519 ptlrpc_req_finished(req);
3524 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3525 struct ldlm_enqueue_info *einfo,
3526 struct ptlrpc_request_set *rqset)
3528 struct ldlm_res_id res_id;
3532 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3533 oinfo->oi_md->lsm_object_seq, &res_id);
3535 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3536 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3537 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3538 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3539 rqset, rqset != NULL, 0);
3543 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3544 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3545 int *flags, void *data, struct lustre_handle *lockh,
3548 struct obd_device *obd = exp->exp_obd;
3549 int lflags = *flags;
3553 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3556 /* Filesystem lock extents are extended to page boundaries so that
3557 * dealing with the page cache is a little smoother */
3558 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3559 policy->l_extent.end |= ~CFS_PAGE_MASK;
3561 /* Next, search for already existing extent locks that will cover us */
3562 /* If we're trying to read, we also search for an existing PW lock. The
3563 * VFS and page cache already protect us locally, so lots of readers/
3564 * writers can share a single PW lock. */
3568 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3569 res_id, type, policy, rc, lockh, unref);
3572 if (!osc_set_data_with_check(lockh, data)) {
3573 if (!(lflags & LDLM_FL_TEST_LOCK))
3574 ldlm_lock_decref(lockh, rc);
3578 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3579 ldlm_lock_addref(lockh, LCK_PR);
3580 ldlm_lock_decref(lockh, LCK_PW);
3587 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3591 if (unlikely(mode == LCK_GROUP))
3592 ldlm_lock_decref_and_cancel(lockh, mode);
3594 ldlm_lock_decref(lockh, mode);
3599 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3600 __u32 mode, struct lustre_handle *lockh)
3603 RETURN(osc_cancel_base(lockh, mode));
3606 static int osc_cancel_unused(struct obd_export *exp,
3607 struct lov_stripe_md *lsm,
3608 ldlm_cancel_flags_t flags,
3611 struct obd_device *obd = class_exp2obd(exp);
3612 struct ldlm_res_id res_id, *resp = NULL;
3615 resp = osc_build_res_name(lsm->lsm_object_id,
3616 lsm->lsm_object_seq, &res_id);
3619 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3622 static int osc_statfs_interpret(const struct lu_env *env,
3623 struct ptlrpc_request *req,
3624 struct osc_async_args *aa, int rc)
3626 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3627 struct obd_statfs *msfs;
3632 /* The request has in fact never been sent
3633 * due to issues at a higher level (LOV).
3634 * Exit immediately since the caller is
3635 * aware of the problem and takes care
3636 * of the clean up */
3639 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3640 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3646 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3648 GOTO(out, rc = -EPROTO);
3651 /* Reinitialize the RDONLY and DEGRADED flags at the client
3652 * on each statfs, so they don't stay set permanently. */
3653 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3655 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3656 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3657 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3658 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3660 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3661 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3662 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3663 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3665 /* Add a bit of hysteresis so this flag isn't continually flapping,
3666 * and ensure that new files don't get extremely fragmented due to
3667 * only a small amount of available space in the filesystem.
3668 * We want to set the NOSPC flag when there is less than ~0.1% free
3669 * and clear it when there is at least ~0.2% free space, so:
3670 * avail < ~0.1% max max = avail + used
3671 * 1025 * avail < avail + used used = blocks - free
3672 * 1024 * avail < used
3673 * 1024 * avail < blocks - free
3674 * avail < ((blocks - free) >> 10)
3676 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3677 * lose that amount of space so in those cases we report no space left
3678 * if their is less than 1 GB left. */
3679 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3680 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3681 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3682 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3683 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3684 (msfs->os_ffree > 64) &&
3685 (msfs->os_bavail > (used << 1)))) {
3686 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3687 OSCC_FLAG_NOSPC_BLK);
3690 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3691 (msfs->os_bavail < used)))
3692 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3694 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3696 *aa->aa_oi->oi_osfs = *msfs;
3698 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3702 static int osc_statfs_async(struct obd_export *exp,
3703 struct obd_info *oinfo, __u64 max_age,
3704 struct ptlrpc_request_set *rqset)
3706 struct obd_device *obd = class_exp2obd(exp);
3707 struct ptlrpc_request *req;
3708 struct osc_async_args *aa;
3712 /* We could possibly pass max_age in the request (as an absolute
3713 * timestamp or a "seconds.usec ago") so the target can avoid doing
3714 * extra calls into the filesystem if that isn't necessary (e.g.
3715 * during mount that would help a bit). Having relative timestamps
3716 * is not so great if request processing is slow, while absolute
3717 * timestamps are not ideal because they need time synchronization. */
3718 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3722 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3724 ptlrpc_request_free(req);
3727 ptlrpc_request_set_replen(req);
3728 req->rq_request_portal = OST_CREATE_PORTAL;
3729 ptlrpc_at_set_req_timeout(req);
3731 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3732 /* procfs requests not want stat in wait for avoid deadlock */
3733 req->rq_no_resend = 1;
3734 req->rq_no_delay = 1;
3737 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3738 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3739 aa = ptlrpc_req_async_args(req);
3742 ptlrpc_set_add_req(rqset, req);
3746 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3747 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
3749 struct obd_device *obd = class_exp2obd(exp);
3750 struct obd_statfs *msfs;
3751 struct ptlrpc_request *req;
3752 struct obd_import *imp = NULL;
3756 /*Since the request might also come from lprocfs, so we need
3757 *sync this with client_disconnect_export Bug15684*/
3758 cfs_down_read(&obd->u.cli.cl_sem);
3759 if (obd->u.cli.cl_import)
3760 imp = class_import_get(obd->u.cli.cl_import);
3761 cfs_up_read(&obd->u.cli.cl_sem);
3765 /* We could possibly pass max_age in the request (as an absolute
3766 * timestamp or a "seconds.usec ago") so the target can avoid doing
3767 * extra calls into the filesystem if that isn't necessary (e.g.
3768 * during mount that would help a bit). Having relative timestamps
3769 * is not so great if request processing is slow, while absolute
3770 * timestamps are not ideal because they need time synchronization. */
3771 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3773 class_import_put(imp);
3778 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3780 ptlrpc_request_free(req);
3783 ptlrpc_request_set_replen(req);
3784 req->rq_request_portal = OST_CREATE_PORTAL;
3785 ptlrpc_at_set_req_timeout(req);
3787 if (flags & OBD_STATFS_NODELAY) {
3788 /* procfs requests not want stat in wait for avoid deadlock */
3789 req->rq_no_resend = 1;
3790 req->rq_no_delay = 1;
3793 rc = ptlrpc_queue_wait(req);
3797 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3799 GOTO(out, rc = -EPROTO);
3806 ptlrpc_req_finished(req);
3810 /* Retrieve object striping information.
3812 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3813 * the maximum number of OST indices which will fit in the user buffer.
3814 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3816 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3818 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3819 struct lov_user_md_v3 lum, *lumk;
3820 struct lov_user_ost_data_v1 *lmm_objects;
3821 int rc = 0, lum_size;
3827 /* we only need the header part from user space to get lmm_magic and
3828 * lmm_stripe_count, (the header part is common to v1 and v3) */
3829 lum_size = sizeof(struct lov_user_md_v1);
3830 if (cfs_copy_from_user(&lum, lump, lum_size))
3833 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3834 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3837 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3838 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3839 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3840 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3842 /* we can use lov_mds_md_size() to compute lum_size
3843 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3844 if (lum.lmm_stripe_count > 0) {
3845 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3846 OBD_ALLOC(lumk, lum_size);
3850 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3851 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3853 lmm_objects = &(lumk->lmm_objects[0]);
3854 lmm_objects->l_object_id = lsm->lsm_object_id;
3856 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3860 lumk->lmm_object_id = lsm->lsm_object_id;
3861 lumk->lmm_object_seq = lsm->lsm_object_seq;
3862 lumk->lmm_stripe_count = 1;
3864 if (cfs_copy_to_user(lump, lumk, lum_size))
3868 OBD_FREE(lumk, lum_size);
3874 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3875 void *karg, void *uarg)
3877 struct obd_device *obd = exp->exp_obd;
3878 struct obd_ioctl_data *data = karg;
3882 if (!cfs_try_module_get(THIS_MODULE)) {
3883 CERROR("Can't get module. Is it alive?");
3887 case OBD_IOC_LOV_GET_CONFIG: {
3889 struct lov_desc *desc;
3890 struct obd_uuid uuid;
3894 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3895 GOTO(out, err = -EINVAL);
3897 data = (struct obd_ioctl_data *)buf;
3899 if (sizeof(*desc) > data->ioc_inllen1) {
3900 obd_ioctl_freedata(buf, len);
3901 GOTO(out, err = -EINVAL);
3904 if (data->ioc_inllen2 < sizeof(uuid)) {
3905 obd_ioctl_freedata(buf, len);
3906 GOTO(out, err = -EINVAL);
3909 desc = (struct lov_desc *)data->ioc_inlbuf1;
3910 desc->ld_tgt_count = 1;
3911 desc->ld_active_tgt_count = 1;
3912 desc->ld_default_stripe_count = 1;
3913 desc->ld_default_stripe_size = 0;
3914 desc->ld_default_stripe_offset = 0;
3915 desc->ld_pattern = 0;
3916 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3918 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3920 err = cfs_copy_to_user((void *)uarg, buf, len);
3923 obd_ioctl_freedata(buf, len);
3926 case LL_IOC_LOV_SETSTRIPE:
3927 err = obd_alloc_memmd(exp, karg);
3931 case LL_IOC_LOV_GETSTRIPE:
3932 err = osc_getstripe(karg, uarg);
3934 case OBD_IOC_CLIENT_RECOVER:
3935 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3936 data->ioc_inlbuf1, 0);
3940 case IOC_OSC_SET_ACTIVE:
3941 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3944 case OBD_IOC_POLL_QUOTACHECK:
3945 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3947 case OBD_IOC_PING_TARGET:
3948 err = ptlrpc_obd_ping(obd);
3951 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3952 cmd, cfs_curproc_comm());
3953 GOTO(out, err = -ENOTTY);
3956 cfs_module_put(THIS_MODULE);
3960 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3961 obd_count keylen, void *key, __u32 *vallen, void *val,
3962 struct lov_stripe_md *lsm)
3965 if (!vallen || !val)
3968 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3969 __u32 *stripe = val;
3970 *vallen = sizeof(*stripe);
3973 } else if (KEY_IS(KEY_LAST_ID)) {
3974 struct ptlrpc_request *req;
3979 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3980 &RQF_OST_GET_INFO_LAST_ID);
3984 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3985 RCL_CLIENT, keylen);
3986 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3988 ptlrpc_request_free(req);
3992 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3993 memcpy(tmp, key, keylen);
3995 req->rq_no_delay = req->rq_no_resend = 1;
3996 ptlrpc_request_set_replen(req);
3997 rc = ptlrpc_queue_wait(req);
4001 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
4003 GOTO(out, rc = -EPROTO);
4005 *((obd_id *)val) = *reply;
4007 ptlrpc_req_finished(req);
4009 } else if (KEY_IS(KEY_FIEMAP)) {
4010 struct ptlrpc_request *req;
4011 struct ll_user_fiemap *reply;
4015 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4016 &RQF_OST_GET_INFO_FIEMAP);
4020 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4021 RCL_CLIENT, keylen);
4022 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4023 RCL_CLIENT, *vallen);
4024 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4025 RCL_SERVER, *vallen);
4027 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4029 ptlrpc_request_free(req);
4033 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4034 memcpy(tmp, key, keylen);
4035 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4036 memcpy(tmp, val, *vallen);
4038 ptlrpc_request_set_replen(req);
4039 rc = ptlrpc_queue_wait(req);
4043 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4045 GOTO(out1, rc = -EPROTO);
4047 memcpy(val, reply, *vallen);
4049 ptlrpc_req_finished(req);
4057 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4059 struct llog_ctxt *ctxt;
4063 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4065 rc = llog_initiator_connect(ctxt);
4066 llog_ctxt_put(ctxt);
4068 /* XXX return an error? skip setting below flags? */
4071 cfs_spin_lock(&imp->imp_lock);
4072 imp->imp_server_timeout = 1;
4073 imp->imp_pingable = 1;
4074 cfs_spin_unlock(&imp->imp_lock);
4075 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4080 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4081 struct ptlrpc_request *req,
4088 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4091 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
4092 obd_count keylen, void *key, obd_count vallen,
4093 void *val, struct ptlrpc_request_set *set)
4095 struct ptlrpc_request *req;
4096 struct obd_device *obd = exp->exp_obd;
4097 struct obd_import *imp = class_exp2cliimp(exp);
4102 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4104 if (KEY_IS(KEY_NEXT_ID)) {
4106 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4108 if (vallen != sizeof(obd_id))
4113 if (vallen != sizeof(obd_id))
4116 /* avoid race between allocate new object and set next id
4117 * from ll_sync thread */
4118 cfs_spin_lock(&oscc->oscc_lock);
4119 new_val = *((obd_id*)val) + 1;
4120 if (new_val > oscc->oscc_next_id)
4121 oscc->oscc_next_id = new_val;
4122 cfs_spin_unlock(&oscc->oscc_lock);
4123 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4124 exp->exp_obd->obd_name,
4125 obd->u.cli.cl_oscc.oscc_next_id);
4130 if (KEY_IS(KEY_CHECKSUM)) {
4131 if (vallen != sizeof(int))
4133 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4137 if (KEY_IS(KEY_SPTLRPC_CONF)) {
4138 sptlrpc_conf_client_adapt(obd);
4142 if (KEY_IS(KEY_FLUSH_CTX)) {
4143 sptlrpc_import_flush_my_ctx(imp);
4147 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4150 /* We pass all other commands directly to OST. Since nobody calls osc
4151 methods directly and everybody is supposed to go through LOV, we
4152 assume lov checked invalid values for us.
4153 The only recognised values so far are evict_by_nid and mds_conn.
4154 Even if something bad goes through, we'd get a -EINVAL from OST
4157 if (KEY_IS(KEY_GRANT_SHRINK))
4158 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4160 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4165 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4166 RCL_CLIENT, keylen);
4167 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4168 RCL_CLIENT, vallen);
4169 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4171 ptlrpc_request_free(req);
4175 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4176 memcpy(tmp, key, keylen);
4177 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4178 memcpy(tmp, val, vallen);
4180 if (KEY_IS(KEY_MDS_CONN)) {
4181 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4183 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4184 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4185 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4186 req->rq_no_delay = req->rq_no_resend = 1;
4187 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4188 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4189 struct osc_grant_args *aa;
4192 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4193 aa = ptlrpc_req_async_args(req);
4196 ptlrpc_req_finished(req);
4199 *oa = ((struct ost_body *)val)->oa;
4201 req->rq_interpret_reply = osc_shrink_grant_interpret;
4204 ptlrpc_request_set_replen(req);
4205 if (!KEY_IS(KEY_GRANT_SHRINK)) {
4206 LASSERT(set != NULL);
4207 ptlrpc_set_add_req(set, req);
4208 ptlrpc_check_set(NULL, set);
4210 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4216 static struct llog_operations osc_size_repl_logops = {
4217 lop_cancel: llog_obd_repl_cancel
4220 static struct llog_operations osc_mds_ost_orig_logops;
4222 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4223 struct obd_device *tgt, struct llog_catid *catid)
4228 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4229 &catid->lci_logid, &osc_mds_ost_orig_logops);
4231 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4235 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4236 NULL, &osc_size_repl_logops);
4238 struct llog_ctxt *ctxt =
4239 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4242 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4247 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4248 obd->obd_name, tgt->obd_name, catid, rc);
4249 CERROR("logid "LPX64":0x%x\n",
4250 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4255 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4256 struct obd_device *disk_obd, int *index)
4258 struct llog_catid catid;
4259 static char name[32] = CATLIST;
4263 LASSERT(olg == &obd->obd_olg);
4265 cfs_mutex_lock(&olg->olg_cat_processing);
4266 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4268 CERROR("rc: %d\n", rc);
4272 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4273 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4274 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4276 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4278 CERROR("rc: %d\n", rc);
4282 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4284 CERROR("rc: %d\n", rc);
4289 cfs_mutex_unlock(&olg->olg_cat_processing);
4294 static int osc_llog_finish(struct obd_device *obd, int count)
4296 struct llog_ctxt *ctxt;
4297 int rc = 0, rc2 = 0;
4300 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4302 rc = llog_cleanup(ctxt);
4304 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4306 rc2 = llog_cleanup(ctxt);
4313 static int osc_reconnect(const struct lu_env *env,
4314 struct obd_export *exp, struct obd_device *obd,
4315 struct obd_uuid *cluuid,
4316 struct obd_connect_data *data,
4319 struct client_obd *cli = &obd->u.cli;
4321 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4324 client_obd_list_lock(&cli->cl_loi_list_lock);
4325 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4326 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4327 lost_grant = cli->cl_lost_grant;
4328 cli->cl_lost_grant = 0;
4329 client_obd_list_unlock(&cli->cl_loi_list_lock);
4331 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4332 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4333 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4334 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4335 " ocd_grant: %d\n", data->ocd_connect_flags,
4336 data->ocd_version, data->ocd_grant);
4342 static int osc_disconnect(struct obd_export *exp)
4344 struct obd_device *obd = class_exp2obd(exp);
4345 struct llog_ctxt *ctxt;
4348 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4350 if (obd->u.cli.cl_conn_count == 1) {
4351 /* Flush any remaining cancel messages out to the
4353 llog_sync(ctxt, exp);
4355 llog_ctxt_put(ctxt);
4357 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4361 rc = client_disconnect_export(exp);
4363 * Initially we put del_shrink_grant before disconnect_export, but it
4364 * causes the following problem if setup (connect) and cleanup
4365 * (disconnect) are tangled together.
4366 * connect p1 disconnect p2
4367 * ptlrpc_connect_import
4368 * ............... class_manual_cleanup
4371 * ptlrpc_connect_interrupt
4373 * add this client to shrink list
4375 * Bang! pinger trigger the shrink.
4376 * So the osc should be disconnected from the shrink list, after we
4377 * are sure the import has been destroyed. BUG18662
4379 if (obd->u.cli.cl_import == NULL)
4380 osc_del_shrink_grant(&obd->u.cli);
4384 static int osc_import_event(struct obd_device *obd,
4385 struct obd_import *imp,
4386 enum obd_import_event event)
4388 struct client_obd *cli;
4392 LASSERT(imp->imp_obd == obd);
4395 case IMP_EVENT_DISCON: {
4396 /* Only do this on the MDS OSC's */
4397 if (imp->imp_server_timeout) {
4398 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4400 cfs_spin_lock(&oscc->oscc_lock);
4401 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4402 cfs_spin_unlock(&oscc->oscc_lock);
4405 client_obd_list_lock(&cli->cl_loi_list_lock);
4406 cli->cl_avail_grant = 0;
4407 cli->cl_lost_grant = 0;
4408 client_obd_list_unlock(&cli->cl_loi_list_lock);
4411 case IMP_EVENT_INACTIVE: {
4412 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4415 case IMP_EVENT_INVALIDATE: {
4416 struct ldlm_namespace *ns = obd->obd_namespace;
4420 env = cl_env_get(&refcheck);
4424 client_obd_list_lock(&cli->cl_loi_list_lock);
4425 /* all pages go to failing rpcs due to the invalid
4427 osc_check_rpcs(env, cli);
4428 client_obd_list_unlock(&cli->cl_loi_list_lock);
4430 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4431 cl_env_put(env, &refcheck);
4436 case IMP_EVENT_ACTIVE: {
4437 /* Only do this on the MDS OSC's */
4438 if (imp->imp_server_timeout) {
4439 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4441 cfs_spin_lock(&oscc->oscc_lock);
4442 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4443 OSCC_FLAG_NOSPC_BLK);
4444 cfs_spin_unlock(&oscc->oscc_lock);
4446 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4449 case IMP_EVENT_OCD: {
4450 struct obd_connect_data *ocd = &imp->imp_connect_data;
4452 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4453 osc_init_grant(&obd->u.cli, ocd);
4456 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4457 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4459 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4462 case IMP_EVENT_DEACTIVATE: {
4463 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4466 case IMP_EVENT_ACTIVATE: {
4467 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4471 CERROR("Unknown import event %d\n", event);
4478 * Determine whether the lock can be canceled before replaying the lock
4479 * during recovery, see bug16774 for detailed information.
4481 * \retval zero the lock can't be canceled
4482 * \retval other ok to cancel
4484 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4486 check_res_locked(lock->l_resource);
4489 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4491 * XXX as a future improvement, we can also cancel unused write lock
4492 * if it doesn't have dirty data and active mmaps.
4494 if (lock->l_resource->lr_type == LDLM_EXTENT &&
4495 (lock->l_granted_mode == LCK_PR ||
4496 lock->l_granted_mode == LCK_CR) &&
4497 (osc_dlm_lock_pageref(lock) == 0))
4503 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4505 struct client_obd *cli = &obd->u.cli;
4510 rc = ptlrpcd_addref();
4514 rc = client_obd_setup(obd, lcfg);
4517 handler = ptlrpcd_alloc_work(cli->cl_import,
4518 brw_queue_work, cli);
4519 if (!IS_ERR(handler))
4520 cli->cl_writeback_work = handler;
4522 rc = PTR_ERR(handler);
4526 struct lprocfs_static_vars lvars = { 0 };
4528 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4529 lprocfs_osc_init_vars(&lvars);
4530 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4531 lproc_osc_attach_seqstat(obd);
4532 sptlrpc_lprocfs_cliobd_attach(obd);
4533 ptlrpc_lprocfs_register_obd(obd);
4537 /* We need to allocate a few requests more, because
4538 brw_interpret tries to create new requests before freeing
4539 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4540 reserved, but I afraid that might be too much wasted RAM
4541 in fact, so 2 is just my guess and still should work. */
4542 cli->cl_import->imp_rq_pool =
4543 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4545 ptlrpc_add_rqs_to_pool);
4547 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4549 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4557 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4563 case OBD_CLEANUP_EARLY: {
4564 struct obd_import *imp;
4565 imp = obd->u.cli.cl_import;
4566 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4567 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4568 ptlrpc_deactivate_import(imp);
4569 cfs_spin_lock(&imp->imp_lock);
4570 imp->imp_pingable = 0;
4571 cfs_spin_unlock(&imp->imp_lock);
4574 case OBD_CLEANUP_EXPORTS: {
4575 struct client_obd *cli = &obd->u.cli;
4577 * for echo client, export may be on zombie list, wait for
4578 * zombie thread to cull it, because cli.cl_import will be
4579 * cleared in client_disconnect_export():
4580 * class_export_destroy() -> obd_cleanup() ->
4581 * echo_device_free() -> echo_client_cleanup() ->
4582 * obd_disconnect() -> osc_disconnect() ->
4583 * client_disconnect_export()
4585 obd_zombie_barrier();
4586 if (cli->cl_writeback_work) {
4587 ptlrpcd_destroy_work(cli->cl_writeback_work);
4588 cli->cl_writeback_work = NULL;
4590 obd_cleanup_client_import(obd);
4591 ptlrpc_lprocfs_unregister_obd(obd);
4592 lprocfs_obd_cleanup(obd);
4593 rc = obd_llog_finish(obd, 0);
4595 CERROR("failed to cleanup llogging subsystems\n");
4602 int osc_cleanup(struct obd_device *obd)
4608 /* free memory of osc quota cache */
4609 osc_quota_cleanup(obd);
4611 rc = client_obd_cleanup(obd);
4617 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4619 struct lprocfs_static_vars lvars = { 0 };
4622 lprocfs_osc_init_vars(&lvars);
4624 switch (lcfg->lcfg_command) {
4626 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4636 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4638 return osc_process_config_base(obd, buf);
4641 struct obd_ops osc_obd_ops = {
4642 .o_owner = THIS_MODULE,
4643 .o_setup = osc_setup,
4644 .o_precleanup = osc_precleanup,
4645 .o_cleanup = osc_cleanup,
4646 .o_add_conn = client_import_add_conn,
4647 .o_del_conn = client_import_del_conn,
4648 .o_connect = client_connect_import,
4649 .o_reconnect = osc_reconnect,
4650 .o_disconnect = osc_disconnect,
4651 .o_statfs = osc_statfs,
4652 .o_statfs_async = osc_statfs_async,
4653 .o_packmd = osc_packmd,
4654 .o_unpackmd = osc_unpackmd,
4655 .o_precreate = osc_precreate,
4656 .o_create = osc_create,
4657 .o_create_async = osc_create_async,
4658 .o_destroy = osc_destroy,
4659 .o_getattr = osc_getattr,
4660 .o_getattr_async = osc_getattr_async,
4661 .o_setattr = osc_setattr,
4662 .o_setattr_async = osc_setattr_async,
4664 .o_punch = osc_punch,
4666 .o_enqueue = osc_enqueue,
4667 .o_change_cbdata = osc_change_cbdata,
4668 .o_find_cbdata = osc_find_cbdata,
4669 .o_cancel = osc_cancel,
4670 .o_cancel_unused = osc_cancel_unused,
4671 .o_iocontrol = osc_iocontrol,
4672 .o_get_info = osc_get_info,
4673 .o_set_info_async = osc_set_info_async,
4674 .o_import_event = osc_import_event,
4675 .o_llog_init = osc_llog_init,
4676 .o_llog_finish = osc_llog_finish,
4677 .o_process_config = osc_process_config,
4678 .o_quotactl = osc_quotactl,
4679 .o_quotacheck = osc_quotacheck,
4680 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
4683 extern struct lu_kmem_descr osc_caches[];
4684 extern cfs_spinlock_t osc_ast_guard;
4685 extern cfs_lock_class_key_t osc_ast_guard_class;
4687 int __init osc_init(void)
4689 struct lprocfs_static_vars lvars = { 0 };
4693 /* print an address of _any_ initialized kernel symbol from this
4694 * module, to allow debugging with gdb that doesn't support data
4695 * symbols from modules.*/
4696 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4698 rc = lu_kmem_init(osc_caches);
4700 lprocfs_osc_init_vars(&lvars);
4703 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4704 LUSTRE_OSC_NAME, &osc_device_type);
4706 lu_kmem_fini(osc_caches);
4710 cfs_spin_lock_init(&osc_ast_guard);
4711 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4713 osc_mds_ost_orig_logops = llog_lvfs_ops;
4714 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4715 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4716 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4717 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4723 static void /*__exit*/ osc_exit(void)
4726 class_unregister_type(LUSTRE_OSC_NAME);
4727 lu_kmem_fini(osc_caches);
4730 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4731 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4732 MODULE_LICENSE("GPL");
4734 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);