1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 body->oa = *oinfo->oi_oa;
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214 lustre_swab_ost_body);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 *oinfo->oi_oa = body->oa;
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308 struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316 CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317 "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318 oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327 ptlrpc_request_free(req);
331 osc_pack_req_body(req, oinfo);
333 ptlrpc_request_set_replen(req);
335 rc = ptlrpc_queue_wait(req);
339 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EPROTO);
343 *oinfo->oi_oa = body->oa;
347 ptlrpc_req_finished(req);
351 static int osc_setattr_interpret(const struct lu_env *env,
352 struct ptlrpc_request *req,
353 struct osc_async_args *aa, int rc)
355 struct ost_body *body;
361 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363 GOTO(out, rc = -EPROTO);
365 *aa->aa_oi->oi_oa = body->oa;
367 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372 struct obd_trans_info *oti,
373 struct ptlrpc_request_set *rqset)
375 struct ptlrpc_request *req;
376 struct osc_async_args *aa;
380 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387 ptlrpc_request_free(req);
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
400 /* do mds to ost setattr asynchronously */
402 /* Do not wait for response. */
403 ptlrpcd_add_req(req, PSCOPE_OTHER);
405 req->rq_interpret_reply =
406 (ptlrpc_interpterer_t)osc_setattr_interpret;
408 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409 aa = ptlrpc_req_async_args(req);
412 ptlrpc_set_add_req(rqset, req);
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419 struct lov_stripe_md **ea, struct obd_trans_info *oti)
421 struct ptlrpc_request *req;
422 struct ost_body *body;
423 struct lov_stripe_md *lsm;
432 rc = obd_alloc_memmd(exp, &lsm);
437 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
439 GOTO(out, rc = -ENOMEM);
441 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
443 ptlrpc_request_free(req);
447 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
451 ptlrpc_request_set_replen(req);
453 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454 oa->o_flags == OBD_FL_DELORPHAN) {
456 "delorphan from OST integration");
457 /* Don't resend the delorphan req */
458 req->rq_no_resend = req->rq_no_delay = 1;
461 rc = ptlrpc_queue_wait(req);
465 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
467 GOTO(out_req, rc = -EPROTO);
471 /* This should really be sent by the OST */
472 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473 oa->o_valid |= OBD_MD_FLBLKSZ;
475 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476 * have valid lsm_oinfo data structs, so don't go touching that.
477 * This needs to be fixed in a big way.
479 lsm->lsm_object_id = oa->o_id;
480 lsm->lsm_object_gr = oa->o_gr;
484 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
486 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487 if (!oti->oti_logcookies)
488 oti_alloc_cookies(oti, 1);
489 *oti->oti_logcookies = oa->o_lcookie;
493 CDEBUG(D_HA, "transno: "LPD64"\n",
494 lustre_msg_get_transno(req->rq_repmsg));
496 ptlrpc_req_finished(req);
499 obd_free_memmd(exp, &lsm);
503 static int osc_punch_interpret(const struct lu_env *env,
504 struct ptlrpc_request *req,
505 struct osc_punch_args *aa, int rc)
507 struct ost_body *body;
513 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
515 GOTO(out, rc = -EPROTO);
517 *aa->pa_oa = body->oa;
519 rc = aa->pa_upcall(aa->pa_cookie, rc);
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524 struct obd_capa *capa,
525 obd_enqueue_update_f upcall, void *cookie,
526 struct ptlrpc_request_set *rqset)
528 struct ptlrpc_request *req;
529 struct osc_punch_args *aa;
530 struct ost_body *body;
534 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
538 osc_set_capa_size(req, &RMF_CAPA1, capa);
539 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
541 ptlrpc_request_free(req);
544 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545 ptlrpc_at_set_req_timeout(req);
547 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
550 osc_pack_capa(req, body, capa);
552 ptlrpc_request_set_replen(req);
555 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557 aa = ptlrpc_req_async_args(req);
559 aa->pa_upcall = upcall;
560 aa->pa_cookie = cookie;
561 if (rqset == PTLRPCD_SET)
562 ptlrpcd_add_req(req, PSCOPE_OTHER);
564 ptlrpc_set_add_req(rqset, req);
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570 struct obd_trans_info *oti,
571 struct ptlrpc_request_set *rqset)
573 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
574 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577 oinfo->oi_cb_up, oinfo, rqset);
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *md, obd_size start, obd_size end,
584 struct ptlrpc_request *req;
585 struct ost_body *body;
590 CDEBUG(D_INFO, "oa NULL\n");
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598 osc_set_capa_size(req, &RMF_CAPA1, capa);
599 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 ptlrpc_request_free(req);
605 /* overload the size and blocks fields in the oa with start/end */
606 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609 body->oa.o_size = start;
610 body->oa.o_blocks = end;
611 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612 osc_pack_capa(req, body, capa);
614 ptlrpc_request_set_replen(req);
616 rc = ptlrpc_queue_wait(req);
620 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
622 GOTO(out, rc = -EPROTO);
628 ptlrpc_req_finished(req);
632 /* Find and cancel locally locks matched by @mode in the resource found by
633 * @objid. Found locks are added into @cancel list. Returns the amount of
634 * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636 struct list_head *cancels, ldlm_mode_t mode,
639 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640 struct ldlm_res_id res_id;
641 struct ldlm_resource *res;
645 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
650 LDLM_RESOURCE_ADDREF(res);
651 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652 lock_flags, 0, NULL);
653 LDLM_RESOURCE_DELREF(res);
654 ldlm_resource_putref(res);
658 static int osc_destroy_interpret(const struct lu_env *env,
659 struct ptlrpc_request *req, void *data,
662 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
664 atomic_dec(&cli->cl_destroy_in_flight);
665 cfs_waitq_signal(&cli->cl_destroy_waitq);
669 static int osc_can_send_destroy(struct client_obd *cli)
671 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672 cli->cl_max_rpcs_in_flight) {
673 /* The destroy request can be sent */
676 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677 cli->cl_max_rpcs_in_flight) {
679 * The counter has been modified between the two atomic
682 cfs_waitq_signal(&cli->cl_destroy_waitq);
687 /* Destroy requests can be async always on the client, and we don't even really
688 * care about the return code since the client cannot do anything at all about
690 * When the MDS is unlinking a filename, it saves the file objects into a
691 * recovery llog, and these object records are cancelled when the OST reports
692 * they were destroyed and sync'd to disk (i.e. transaction committed).
693 * If the client dies, or the OST is down when the object should be destroyed,
694 * the records are not cancelled, and when the OST reconnects to the MDS next,
695 * it will retrieve the llog unlink logs and then sends the log cancellation
696 * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698 struct lov_stripe_md *ea, struct obd_trans_info *oti,
699 struct obd_export *md_export, void *capa)
701 struct client_obd *cli = &exp->exp_obd->u.cli;
702 struct ptlrpc_request *req;
703 struct ost_body *body;
704 CFS_LIST_HEAD(cancels);
709 CDEBUG(D_INFO, "oa NULL\n");
713 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714 LDLM_FL_DISCARD_DATA);
716 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
718 ldlm_lock_list_put(&cancels, l_bl_ast, count);
722 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
726 ptlrpc_request_free(req);
730 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731 ptlrpc_at_set_req_timeout(req);
733 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734 oa->o_lcookie = *oti->oti_logcookies;
735 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
739 osc_pack_capa(req, body, (struct obd_capa *)capa);
740 ptlrpc_request_set_replen(req);
742 /* don't throttle destroy RPCs for the MDT */
743 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744 req->rq_interpret_reply = osc_destroy_interpret;
745 if (!osc_can_send_destroy(cli)) {
746 struct l_wait_info lwi = { 0 };
749 * Wait until the number of on-going destroy RPCs drops
750 * under max_rpc_in_flight
752 l_wait_event_exclusive(cli->cl_destroy_waitq,
753 osc_can_send_destroy(cli), &lwi);
757 /* Do not wait for response */
758 ptlrpcd_add_req(req, PSCOPE_OTHER);
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
765 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767 LASSERT(!(oa->o_valid & bits));
770 client_obd_list_lock(&cli->cl_loi_list_lock);
771 oa->o_dirty = cli->cl_dirty;
772 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776 } else if (atomic_read(&obd_dirty_pages) -
777 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778 CERROR("dirty %d - %d > system dirty_max %d\n",
779 atomic_read(&obd_dirty_pages),
780 atomic_read(&obd_dirty_transit_pages),
781 obd_max_dirty_pages);
783 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784 CERROR("dirty %lu - dirty_max %lu too big???\n",
785 cli->cl_dirty, cli->cl_dirty_max);
788 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789 (cli->cl_max_rpcs_in_flight + 1);
790 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792 oa->o_grant = cli->cl_avail_grant;
793 oa->o_dropped = cli->cl_lost_grant;
794 cli->cl_lost_grant = 0;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
796 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
800 /* caller must hold loi_list_lock */
801 static void osc_consume_write_grant(struct client_obd *cli,
802 struct brw_page *pga)
804 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
805 atomic_inc(&obd_dirty_pages);
806 cli->cl_dirty += CFS_PAGE_SIZE;
807 cli->cl_avail_grant -= CFS_PAGE_SIZE;
808 pga->flag |= OBD_BRW_FROM_GRANT;
809 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
810 CFS_PAGE_SIZE, pga, pga->pg);
811 LASSERT(cli->cl_avail_grant >= 0);
814 /* the companion to osc_consume_write_grant, called when a brw has completed.
815 * must be called with the loi lock held. */
816 static void osc_release_write_grant(struct client_obd *cli,
817 struct brw_page *pga, int sent)
819 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
822 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827 pga->flag &= ~OBD_BRW_FROM_GRANT;
828 atomic_dec(&obd_dirty_pages);
829 cli->cl_dirty -= CFS_PAGE_SIZE;
830 if (pga->flag & OBD_BRW_NOCACHE) {
831 pga->flag &= ~OBD_BRW_NOCACHE;
832 atomic_dec(&obd_dirty_transit_pages);
833 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
836 cli->cl_lost_grant += CFS_PAGE_SIZE;
837 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
838 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
839 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
840 /* For short writes we shouldn't count parts of pages that
841 * span a whole block on the OST side, or our accounting goes
842 * wrong. Should match the code in filter_grant_check. */
843 int offset = pga->off & ~CFS_PAGE_MASK;
844 int count = pga->count + (offset & (blocksize - 1));
845 int end = (offset + pga->count) & (blocksize - 1);
847 count += blocksize - end;
849 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
850 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
851 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
852 cli->cl_avail_grant, cli->cl_dirty);
858 static unsigned long rpcs_in_flight(struct client_obd *cli)
860 return cli->cl_r_in_flight + cli->cl_w_in_flight;
863 /* caller must hold loi_list_lock */
864 void osc_wake_cache_waiters(struct client_obd *cli)
866 struct list_head *l, *tmp;
867 struct osc_cache_waiter *ocw;
870 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
871 /* if we can't dirty more, we must wait until some is written */
872 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
873 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
874 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
875 "osc max %ld, sys max %d\n", cli->cl_dirty,
876 cli->cl_dirty_max, obd_max_dirty_pages);
880 /* if still dirty cache but no grant wait for pending RPCs that
881 * may yet return us some grant before doing sync writes */
882 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
883 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
884 cli->cl_w_in_flight);
888 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
889 list_del_init(&ocw->ocw_entry);
890 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
891 /* no more RPCs in flight to return grant, do sync IO */
892 ocw->ocw_rc = -EDQUOT;
893 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
895 osc_consume_write_grant(cli,
896 &ocw->ocw_oap->oap_brw_page);
899 cfs_waitq_signal(&ocw->ocw_waitq);
905 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
907 client_obd_list_lock(&cli->cl_loi_list_lock);
908 cli->cl_avail_grant = ocd->ocd_grant;
909 client_obd_list_unlock(&cli->cl_loi_list_lock);
911 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
912 cli->cl_avail_grant, cli->cl_lost_grant);
913 LASSERT(cli->cl_avail_grant >= 0);
916 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
918 client_obd_list_lock(&cli->cl_loi_list_lock);
919 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
920 if (body->oa.o_valid & OBD_MD_FLGRANT)
921 cli->cl_avail_grant += body->oa.o_grant;
922 /* waiters are woken in brw_interpret */
923 client_obd_list_unlock(&cli->cl_loi_list_lock);
926 /* We assume that the reason this OSC got a short read is because it read
927 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
928 * via the LOV, and it _knows_ it's reading inside the file, it's just that
929 * this stripe never got written at or beyond this stripe offset yet. */
930 static void handle_short_read(int nob_read, obd_count page_count,
931 struct brw_page **pga)
936 /* skip bytes read OK */
937 while (nob_read > 0) {
938 LASSERT (page_count > 0);
940 if (pga[i]->count > nob_read) {
941 /* EOF inside this page */
942 ptr = cfs_kmap(pga[i]->pg) +
943 (pga[i]->off & ~CFS_PAGE_MASK);
944 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
945 cfs_kunmap(pga[i]->pg);
951 nob_read -= pga[i]->count;
956 /* zero remaining pages */
957 while (page_count-- > 0) {
958 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
959 memset(ptr, 0, pga[i]->count);
960 cfs_kunmap(pga[i]->pg);
965 static int check_write_rcs(struct ptlrpc_request *req,
966 int requested_nob, int niocount,
967 obd_count page_count, struct brw_page **pga)
971 /* return error if any niobuf was in error */
972 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
973 sizeof(*remote_rcs) * niocount, NULL);
974 if (remote_rcs == NULL) {
975 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
978 if (lustre_msg_swabbed(req->rq_repmsg))
979 for (i = 0; i < niocount; i++)
980 __swab32s(&remote_rcs[i]);
982 for (i = 0; i < niocount; i++) {
983 if (remote_rcs[i] < 0)
984 return(remote_rcs[i]);
986 if (remote_rcs[i] != 0) {
987 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
988 i, remote_rcs[i], req);
993 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
994 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
995 req->rq_bulk->bd_nob_transferred, requested_nob);
1002 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1004 if (p1->flag != p2->flag) {
1005 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1006 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1008 /* warn if we try to combine flags that we don't know to be
1009 * safe to combine */
1010 if ((p1->flag & mask) != (p2->flag & mask))
1011 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1012 "same brw?\n", p1->flag, p2->flag);
1016 return (p1->off + p1->count == p2->off);
1019 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1020 struct brw_page **pga, int opc,
1021 cksum_type_t cksum_type)
1026 LASSERT (pg_count > 0);
1027 cksum = init_checksum(cksum_type);
1028 while (nob > 0 && pg_count > 0) {
1029 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1030 int off = pga[i]->off & ~CFS_PAGE_MASK;
1031 int count = pga[i]->count > nob ? nob : pga[i]->count;
1033 /* corrupt the data before we compute the checksum, to
1034 * simulate an OST->client data error */
1035 if (i == 0 && opc == OST_READ &&
1036 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1037 memcpy(ptr + off, "bad1", min(4, nob));
1038 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1039 cfs_kunmap(pga[i]->pg);
1040 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1043 nob -= pga[i]->count;
1047 /* For sending we only compute the wrong checksum instead
1048 * of corrupting the data so it is still correct on a redo */
1049 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1055 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1056 struct lov_stripe_md *lsm, obd_count page_count,
1057 struct brw_page **pga,
1058 struct ptlrpc_request **reqp,
1059 struct obd_capa *ocapa, int reserve)
1061 struct ptlrpc_request *req;
1062 struct ptlrpc_bulk_desc *desc;
1063 struct ost_body *body;
1064 struct obd_ioobj *ioobj;
1065 struct niobuf_remote *niobuf;
1066 int niocount, i, requested_nob, opc, rc;
1067 struct osc_brw_async_args *aa;
1068 struct req_capsule *pill;
1069 struct brw_page *pg_prev;
1072 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1073 RETURN(-ENOMEM); /* Recoverable */
1074 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1075 RETURN(-EINVAL); /* Fatal */
1077 if ((cmd & OBD_BRW_WRITE) != 0) {
1079 req = ptlrpc_request_alloc_pool(cli->cl_import,
1080 cli->cl_import->imp_rq_pool,
1084 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1089 for (niocount = i = 1; i < page_count; i++) {
1090 if (!can_merge_pages(pga[i - 1], pga[i]))
1094 pill = &req->rq_pill;
1095 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1096 niocount * sizeof(*niobuf));
1097 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1099 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1101 ptlrpc_request_free(req);
1104 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1105 ptlrpc_at_set_req_timeout(req);
1107 if (opc == OST_WRITE)
1108 desc = ptlrpc_prep_bulk_imp(req, page_count,
1109 BULK_GET_SOURCE, OST_BULK_PORTAL);
1111 desc = ptlrpc_prep_bulk_imp(req, page_count,
1112 BULK_PUT_SINK, OST_BULK_PORTAL);
1115 GOTO(out, rc = -ENOMEM);
1116 /* NB request now owns desc and will free it when it gets freed */
1118 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1119 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1120 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1121 LASSERT(body && ioobj && niobuf);
1125 obdo_to_ioobj(oa, ioobj);
1126 ioobj->ioo_bufcnt = niocount;
1127 osc_pack_capa(req, body, ocapa);
1128 LASSERT (page_count > 0);
1130 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1131 struct brw_page *pg = pga[i];
1133 LASSERT(pg->count > 0);
1134 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1135 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1136 pg->off, pg->count);
1138 LASSERTF(i == 0 || pg->off > pg_prev->off,
1139 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1140 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1142 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1143 pg_prev->pg, page_private(pg_prev->pg),
1144 pg_prev->pg->index, pg_prev->off);
1146 LASSERTF(i == 0 || pg->off > pg_prev->off,
1147 "i %d p_c %u\n", i, page_count);
1149 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1150 (pg->flag & OBD_BRW_SRVLOCK));
1152 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1154 requested_nob += pg->count;
1156 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1158 niobuf->len += pg->count;
1160 niobuf->offset = pg->off;
1161 niobuf->len = pg->count;
1162 niobuf->flags = pg->flag;
1167 LASSERTF((void *)(niobuf - niocount) ==
1168 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1169 niocount * sizeof(*niobuf)),
1170 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1171 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1172 (void *)(niobuf - niocount));
1174 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1176 /* size[REQ_REC_OFF] still sizeof (*body) */
1177 if (opc == OST_WRITE) {
1178 if (unlikely(cli->cl_checksum) &&
1179 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1180 /* store cl_cksum_type in a local variable since
1181 * it can be changed via lprocfs */
1182 cksum_type_t cksum_type = cli->cl_cksum_type;
1184 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1185 oa->o_flags = body->oa.o_flags = 0;
1186 body->oa.o_flags |= cksum_type_pack(cksum_type);
1187 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1188 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1192 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1194 /* save this in 'oa', too, for later checking */
1195 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1196 oa->o_flags |= cksum_type_pack(cksum_type);
1198 /* clear out the checksum flag, in case this is a
1199 * resend but cl_checksum is no longer set. b=11238 */
1200 oa->o_valid &= ~OBD_MD_FLCKSUM;
1202 oa->o_cksum = body->oa.o_cksum;
1203 /* 1 RC per niobuf */
1204 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1205 sizeof(__u32) * niocount);
1207 if (unlikely(cli->cl_checksum) &&
1208 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1209 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1210 body->oa.o_flags = 0;
1211 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1212 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1214 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1215 /* 1 RC for the whole I/O */
1217 ptlrpc_request_set_replen(req);
1219 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1220 aa = ptlrpc_req_async_args(req);
1222 aa->aa_requested_nob = requested_nob;
1223 aa->aa_nio_count = niocount;
1224 aa->aa_page_count = page_count;
1228 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1229 if (ocapa && reserve)
1230 aa->aa_ocapa = capa_get(ocapa);
1236 ptlrpc_req_finished(req);
1240 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1241 __u32 client_cksum, __u32 server_cksum, int nob,
1242 obd_count page_count, struct brw_page **pga,
1243 cksum_type_t client_cksum_type)
1247 cksum_type_t cksum_type;
1249 if (server_cksum == client_cksum) {
1250 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1254 if (oa->o_valid & OBD_MD_FLFLAGS)
1255 cksum_type = cksum_type_unpack(oa->o_flags);
1257 cksum_type = OBD_CKSUM_CRC32;
1259 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1262 if (cksum_type != client_cksum_type)
1263 msg = "the server did not use the checksum type specified in "
1264 "the original request - likely a protocol problem";
1265 else if (new_cksum == server_cksum)
1266 msg = "changed on the client after we checksummed it - "
1267 "likely false positive due to mmap IO (bug 11742)";
1268 else if (new_cksum == client_cksum)
1269 msg = "changed in transit before arrival at OST";
1271 msg = "changed in transit AND doesn't match the original - "
1272 "likely false positive due to mmap IO (bug 11742)";
1274 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1275 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1276 "["LPU64"-"LPU64"]\n",
1277 msg, libcfs_nid2str(peer->nid),
1278 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1279 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1282 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1284 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1285 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1286 "client csum now %x\n", client_cksum, client_cksum_type,
1287 server_cksum, cksum_type, new_cksum);
1291 /* Note rc enters this function as number of bytes transferred */
1292 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1294 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1295 const lnet_process_id_t *peer =
1296 &req->rq_import->imp_connection->c_peer;
1297 struct client_obd *cli = aa->aa_cli;
1298 struct ost_body *body;
1299 __u32 client_cksum = 0;
1302 if (rc < 0 && rc != -EDQUOT)
1305 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1306 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1307 lustre_swab_ost_body);
1309 CDEBUG(D_INFO, "Can't unpack body\n");
1313 /* set/clear over quota flag for a uid/gid */
1314 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1315 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1316 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1317 body->oa.o_gid, body->oa.o_valid,
1323 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1324 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1326 osc_update_grant(cli, body);
1328 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1330 CERROR("Unexpected +ve rc %d\n", rc);
1333 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1335 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1338 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1339 check_write_checksum(&body->oa, peer, client_cksum,
1340 body->oa.o_cksum, aa->aa_requested_nob,
1341 aa->aa_page_count, aa->aa_ppga,
1342 cksum_type_unpack(aa->aa_oa->o_flags)))
1345 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1346 aa->aa_page_count, aa->aa_ppga);
1350 /* The rest of this function executes only for OST_READs */
1352 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1356 if (rc > aa->aa_requested_nob) {
1357 CERROR("Unexpected rc %d (%d requested)\n", rc,
1358 aa->aa_requested_nob);
1362 if (rc != req->rq_bulk->bd_nob_transferred) {
1363 CERROR ("Unexpected rc %d (%d transferred)\n",
1364 rc, req->rq_bulk->bd_nob_transferred);
1368 if (rc < aa->aa_requested_nob)
1369 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1371 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1372 static int cksum_counter;
1373 __u32 server_cksum = body->oa.o_cksum;
1376 cksum_type_t cksum_type;
1378 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1379 cksum_type = cksum_type_unpack(body->oa.o_flags);
1381 cksum_type = OBD_CKSUM_CRC32;
1382 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1383 aa->aa_ppga, OST_READ,
1386 if (peer->nid == req->rq_bulk->bd_sender) {
1390 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1393 if (server_cksum == ~0 && rc > 0) {
1394 CERROR("Protocol error: server %s set the 'checksum' "
1395 "bit, but didn't send a checksum. Not fatal, "
1396 "but please notify on http://bugzilla.lustre.org/\n",
1397 libcfs_nid2str(peer->nid));
1398 } else if (server_cksum != client_cksum) {
1399 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1400 "%s%s%s inum "LPU64"/"LPU64" object "
1401 LPU64"/"LPU64" extent "
1402 "["LPU64"-"LPU64"]\n",
1403 req->rq_import->imp_obd->obd_name,
1404 libcfs_nid2str(peer->nid),
1406 body->oa.o_valid & OBD_MD_FLFID ?
1407 body->oa.o_fid : (__u64)0,
1408 body->oa.o_valid & OBD_MD_FLFID ?
1409 body->oa.o_generation :(__u64)0,
1411 body->oa.o_valid & OBD_MD_FLGROUP ?
1412 body->oa.o_gr : (__u64)0,
1413 aa->aa_ppga[0]->off,
1414 aa->aa_ppga[aa->aa_page_count-1]->off +
1415 aa->aa_ppga[aa->aa_page_count-1]->count -
1417 CERROR("client %x, server %x, cksum_type %x\n",
1418 client_cksum, server_cksum, cksum_type);
1420 aa->aa_oa->o_cksum = client_cksum;
1424 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1427 } else if (unlikely(client_cksum)) {
1428 static int cksum_missed;
1431 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1432 CERROR("Checksum %u requested from %s but not sent\n",
1433 cksum_missed, libcfs_nid2str(peer->nid));
1439 *aa->aa_oa = body->oa;
1444 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1445 struct lov_stripe_md *lsm,
1446 obd_count page_count, struct brw_page **pga,
1447 struct obd_capa *ocapa)
1449 struct ptlrpc_request *req;
1453 struct l_wait_info lwi;
1457 cfs_waitq_init(&waitq);
1460 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1461 page_count, pga, &req, ocapa, 0);
1465 rc = ptlrpc_queue_wait(req);
1467 if (rc == -ETIMEDOUT && req->rq_resend) {
1468 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1469 ptlrpc_req_finished(req);
1473 rc = osc_brw_fini_request(req, rc);
1475 ptlrpc_req_finished(req);
1476 if (osc_recoverable_error(rc)) {
1478 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1479 CERROR("too many resend retries, returning error\n");
1483 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1484 l_wait_event(waitq, 0, &lwi);
1492 int osc_brw_redo_request(struct ptlrpc_request *request,
1493 struct osc_brw_async_args *aa)
1495 struct ptlrpc_request *new_req;
1496 struct ptlrpc_request_set *set = request->rq_set;
1497 struct osc_brw_async_args *new_aa;
1498 struct osc_async_page *oap;
1502 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1503 CERROR("too many resend retries, returning error\n");
1507 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1509 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1510 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1511 aa->aa_cli, aa->aa_oa,
1512 NULL /* lsm unused by osc currently */,
1513 aa->aa_page_count, aa->aa_ppga,
1514 &new_req, aa->aa_ocapa, 0);
1518 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1520 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1521 if (oap->oap_request != NULL) {
1522 LASSERTF(request == oap->oap_request,
1523 "request %p != oap_request %p\n",
1524 request, oap->oap_request);
1525 if (oap->oap_interrupted) {
1526 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1527 ptlrpc_req_finished(new_req);
1532 /* New request takes over pga and oaps from old request.
1533 * Note that copying a list_head doesn't work, need to move it... */
1535 new_req->rq_interpret_reply = request->rq_interpret_reply;
1536 new_req->rq_async_args = request->rq_async_args;
1537 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1539 new_aa = ptlrpc_req_async_args(new_req);
1541 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1542 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1543 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1545 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1546 if (oap->oap_request) {
1547 ptlrpc_req_finished(oap->oap_request);
1548 oap->oap_request = ptlrpc_request_addref(new_req);
1552 new_aa->aa_ocapa = aa->aa_ocapa;
1553 aa->aa_ocapa = NULL;
1555 /* use ptlrpc_set_add_req is safe because interpret functions work
1556 * in check_set context. only one way exist with access to request
1557 * from different thread got -EINTR - this way protected with
1558 * cl_loi_list_lock */
1559 ptlrpc_set_add_req(set, new_req);
1561 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1563 DEBUG_REQ(D_INFO, new_req, "new request");
1568 * ugh, we want disk allocation on the target to happen in offset order. we'll
1569 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1570 * fine for our small page arrays and doesn't require allocation. its an
1571 * insertion sort that swaps elements that are strides apart, shrinking the
1572 * stride down until its '1' and the array is sorted.
1574 static void sort_brw_pages(struct brw_page **array, int num)
1577 struct brw_page *tmp;
1581 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1586 for (i = stride ; i < num ; i++) {
1589 while (j >= stride && array[j - stride]->off > tmp->off) {
1590 array[j] = array[j - stride];
1595 } while (stride > 1);
1598 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1604 LASSERT (pages > 0);
1605 offset = pg[i]->off & ~CFS_PAGE_MASK;
1609 if (pages == 0) /* that's all */
1612 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1613 return count; /* doesn't end on page boundary */
1616 offset = pg[i]->off & ~CFS_PAGE_MASK;
1617 if (offset != 0) /* doesn't start on page boundary */
1624 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1626 struct brw_page **ppga;
1629 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1633 for (i = 0; i < count; i++)
1638 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1640 LASSERT(ppga != NULL);
1641 OBD_FREE(ppga, sizeof(*ppga) * count);
1644 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1645 obd_count page_count, struct brw_page *pga,
1646 struct obd_trans_info *oti)
1648 struct obdo *saved_oa = NULL;
1649 struct brw_page **ppga, **orig;
1650 struct obd_import *imp = class_exp2cliimp(exp);
1651 struct client_obd *cli = &imp->imp_obd->u.cli;
1652 int rc, page_count_orig;
1655 if (cmd & OBD_BRW_CHECK) {
1656 /* The caller just wants to know if there's a chance that this
1657 * I/O can succeed */
1659 if (imp == NULL || imp->imp_invalid)
1664 /* test_brw with a failed create can trip this, maybe others. */
1665 LASSERT(cli->cl_max_pages_per_rpc);
1669 orig = ppga = osc_build_ppga(pga, page_count);
1672 page_count_orig = page_count;
1674 sort_brw_pages(ppga, page_count);
1675 while (page_count) {
1676 obd_count pages_per_brw;
1678 if (page_count > cli->cl_max_pages_per_rpc)
1679 pages_per_brw = cli->cl_max_pages_per_rpc;
1681 pages_per_brw = page_count;
1683 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1685 if (saved_oa != NULL) {
1686 /* restore previously saved oa */
1687 *oinfo->oi_oa = *saved_oa;
1688 } else if (page_count > pages_per_brw) {
1689 /* save a copy of oa (brw will clobber it) */
1690 OBDO_ALLOC(saved_oa);
1691 if (saved_oa == NULL)
1692 GOTO(out, rc = -ENOMEM);
1693 *saved_oa = *oinfo->oi_oa;
1696 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1697 pages_per_brw, ppga, oinfo->oi_capa);
1702 page_count -= pages_per_brw;
1703 ppga += pages_per_brw;
1707 osc_release_ppga(orig, page_count_orig);
1709 if (saved_oa != NULL)
1710 OBDO_FREE(saved_oa);
1715 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1716 * the dirty accounting. Writeback completes or truncate happens before
1717 * writing starts. Must be called with the loi lock held. */
1718 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1721 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1725 /* This maintains the lists of pending pages to read/write for a given object
1726 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1727 * to quickly find objects that are ready to send an RPC. */
1728 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1734 if (lop->lop_num_pending == 0)
1737 /* if we have an invalid import we want to drain the queued pages
1738 * by forcing them through rpcs that immediately fail and complete
1739 * the pages. recovery relies on this to empty the queued pages
1740 * before canceling the locks and evicting down the llite pages */
1741 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1744 /* stream rpcs in queue order as long as as there is an urgent page
1745 * queued. this is our cheap solution for good batching in the case
1746 * where writepage marks some random page in the middle of the file
1747 * as urgent because of, say, memory pressure */
1748 if (!list_empty(&lop->lop_urgent)) {
1749 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1752 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1753 optimal = cli->cl_max_pages_per_rpc;
1754 if (cmd & OBD_BRW_WRITE) {
1755 /* trigger a write rpc stream as long as there are dirtiers
1756 * waiting for space. as they're waiting, they're not going to
1757 * create more pages to coallesce with what's waiting.. */
1758 if (!list_empty(&cli->cl_cache_waiters)) {
1759 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1762 /* +16 to avoid triggering rpcs that would want to include pages
1763 * that are being queued but which can't be made ready until
1764 * the queuer finishes with the page. this is a wart for
1765 * llite::commit_write() */
1768 if (lop->lop_num_pending >= optimal)
1774 static void on_list(struct list_head *item, struct list_head *list,
1777 if (list_empty(item) && should_be_on)
1778 list_add_tail(item, list);
1779 else if (!list_empty(item) && !should_be_on)
1780 list_del_init(item);
1783 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1784 * can find pages to build into rpcs quickly */
1785 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1787 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1788 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1789 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1791 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1792 loi->loi_write_lop.lop_num_pending);
1794 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1795 loi->loi_read_lop.lop_num_pending);
1798 static void lop_update_pending(struct client_obd *cli,
1799 struct loi_oap_pages *lop, int cmd, int delta)
1801 lop->lop_num_pending += delta;
1802 if (cmd & OBD_BRW_WRITE)
1803 cli->cl_pending_w_pages += delta;
1805 cli->cl_pending_r_pages += delta;
1809 * this is called when a sync waiter receives an interruption. Its job is to
1810 * get the caller woken as soon as possible. If its page hasn't been put in an
1811 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1812 * desiring interruption which will forcefully complete the rpc once the rpc
1815 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1817 struct loi_oap_pages *lop;
1818 struct lov_oinfo *loi;
1822 LASSERT(!oap->oap_interrupted);
1823 oap->oap_interrupted = 1;
1825 /* ok, it's been put in an rpc. only one oap gets a request reference */
1826 if (oap->oap_request != NULL) {
1827 ptlrpc_mark_interrupted(oap->oap_request);
1828 ptlrpcd_wake(oap->oap_request);
1829 ptlrpc_req_finished(oap->oap_request);
1830 oap->oap_request = NULL;
1834 * page completion may be called only if ->cpo_prep() method was
1835 * executed by osc_io_submit(), that also adds page the to pending list
1837 if (!list_empty(&oap->oap_pending_item)) {
1838 list_del_init(&oap->oap_pending_item);
1839 list_del_init(&oap->oap_urgent_item);
1842 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1843 &loi->loi_write_lop : &loi->loi_read_lop;
1844 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1845 loi_list_maint(oap->oap_cli, oap->oap_loi);
1846 rc = oap->oap_caller_ops->ap_completion(env,
1847 oap->oap_caller_data,
1848 oap->oap_cmd, NULL, -EINTR);
1854 /* this is trying to propogate async writeback errors back up to the
1855 * application. As an async write fails we record the error code for later if
1856 * the app does an fsync. As long as errors persist we force future rpcs to be
1857 * sync so that the app can get a sync error and break the cycle of queueing
1858 * pages for which writeback will fail. */
1859 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1866 ar->ar_force_sync = 1;
1867 ar->ar_min_xid = ptlrpc_sample_next_xid();
1872 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1873 ar->ar_force_sync = 0;
1876 void osc_oap_to_pending(struct osc_async_page *oap)
1878 struct loi_oap_pages *lop;
1880 if (oap->oap_cmd & OBD_BRW_WRITE)
1881 lop = &oap->oap_loi->loi_write_lop;
1883 lop = &oap->oap_loi->loi_read_lop;
1885 if (oap->oap_async_flags & ASYNC_URGENT)
1886 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1887 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1888 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1891 /* this must be called holding the loi list lock to give coverage to exit_cache,
1892 * async_flag maintenance, and oap_request */
1893 static void osc_ap_completion(const struct lu_env *env,
1894 struct client_obd *cli, struct obdo *oa,
1895 struct osc_async_page *oap, int sent, int rc)
1900 if (oap->oap_request != NULL) {
1901 xid = ptlrpc_req_xid(oap->oap_request);
1902 ptlrpc_req_finished(oap->oap_request);
1903 oap->oap_request = NULL;
1906 oap->oap_async_flags = 0;
1907 oap->oap_interrupted = 0;
1909 if (oap->oap_cmd & OBD_BRW_WRITE) {
1910 osc_process_ar(&cli->cl_ar, xid, rc);
1911 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1914 if (rc == 0 && oa != NULL) {
1915 if (oa->o_valid & OBD_MD_FLBLOCKS)
1916 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1917 if (oa->o_valid & OBD_MD_FLMTIME)
1918 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1919 if (oa->o_valid & OBD_MD_FLATIME)
1920 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1921 if (oa->o_valid & OBD_MD_FLCTIME)
1922 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1925 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1926 oap->oap_cmd, oa, rc);
1928 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1929 * I/O on the page could start, but OSC calls it under lock
1930 * and thus we can add oap back to pending safely */
1932 /* upper layer wants to leave the page on pending queue */
1933 osc_oap_to_pending(oap);
1935 osc_exit_cache(cli, oap, sent);
1939 static int brw_interpret(const struct lu_env *env,
1940 struct ptlrpc_request *req, void *data, int rc)
1942 struct osc_brw_async_args *aa = data;
1943 struct client_obd *cli;
1947 rc = osc_brw_fini_request(req, rc);
1948 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1949 if (osc_recoverable_error(rc)) {
1950 rc = osc_brw_redo_request(req, aa);
1956 capa_put(aa->aa_ocapa);
1957 aa->aa_ocapa = NULL;
1962 client_obd_list_lock(&cli->cl_loi_list_lock);
1964 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1965 * is called so we know whether to go to sync BRWs or wait for more
1966 * RPCs to complete */
1967 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1968 cli->cl_w_in_flight--;
1970 cli->cl_r_in_flight--;
1972 async = list_empty(&aa->aa_oaps);
1973 if (!async) { /* from osc_send_oap_rpc() */
1974 struct osc_async_page *oap, *tmp;
1975 /* the caller may re-use the oap after the completion call so
1976 * we need to clean it up a little */
1977 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1978 list_del_init(&oap->oap_rpc_item);
1979 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1981 OBDO_FREE(aa->aa_oa);
1982 } else { /* from async_internal() */
1984 for (i = 0; i < aa->aa_page_count; i++)
1985 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1987 osc_wake_cache_waiters(cli);
1988 osc_check_rpcs(env, cli);
1989 client_obd_list_unlock(&cli->cl_loi_list_lock);
1991 cl_req_completion(env, aa->aa_clerq, rc);
1992 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1996 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1997 struct client_obd *cli,
1998 struct list_head *rpc_list,
1999 int page_count, int cmd)
2001 struct ptlrpc_request *req;
2002 struct brw_page **pga = NULL;
2003 struct osc_brw_async_args *aa;
2004 struct obdo *oa = NULL;
2005 const struct obd_async_page_ops *ops = NULL;
2006 void *caller_data = NULL;
2007 struct osc_async_page *oap;
2008 struct osc_async_page *tmp;
2009 struct ost_body *body;
2010 struct cl_req *clerq = NULL;
2011 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2012 struct ldlm_lock *lock = NULL;
2013 struct cl_req_attr crattr;
2017 LASSERT(!list_empty(rpc_list));
2019 memset(&crattr, 0, sizeof crattr);
2020 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2022 GOTO(out, req = ERR_PTR(-ENOMEM));
2026 GOTO(out, req = ERR_PTR(-ENOMEM));
2029 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2030 struct cl_page *page = osc_oap2cl_page(oap);
2032 ops = oap->oap_caller_ops;
2033 caller_data = oap->oap_caller_data;
2035 clerq = cl_req_alloc(env, page, crt,
2036 1 /* only 1-object rpcs for
2039 GOTO(out, req = (void *)clerq);
2040 lock = oap->oap_ldlm_lock;
2042 pga[i] = &oap->oap_brw_page;
2043 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2044 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2045 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2047 cl_req_page_add(env, clerq, page);
2050 /* always get the data for the obdo for the rpc */
2051 LASSERT(ops != NULL);
2053 crattr.cra_capa = NULL;
2054 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2056 oa->o_handle = lock->l_remote_handle;
2057 oa->o_valid |= OBD_MD_FLHANDLE;
2060 rc = cl_req_prep(env, clerq);
2062 CERROR("cl_req_prep failed: %d\n", rc);
2063 GOTO(out, req = ERR_PTR(rc));
2066 sort_brw_pages(pga, page_count);
2067 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2068 pga, &req, crattr.cra_capa, 1);
2070 CERROR("prep_req failed: %d\n", rc);
2071 GOTO(out, req = ERR_PTR(rc));
2074 /* Need to update the timestamps after the request is built in case
2075 * we race with setattr (locally or in queue at OST). If OST gets
2076 * later setattr before earlier BRW (as determined by the request xid),
2077 * the OST will not use BRW timestamps. Sadly, there is no obvious
2078 * way to do this in a single call. bug 10150 */
2079 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2080 cl_req_attr_set(env, clerq, &crattr,
2081 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2083 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2084 aa = ptlrpc_req_async_args(req);
2085 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2086 list_splice(rpc_list, &aa->aa_oaps);
2087 CFS_INIT_LIST_HEAD(rpc_list);
2088 aa->aa_clerq = clerq;
2090 capa_put(crattr.cra_capa);
2095 OBD_FREE(pga, sizeof(*pga) * page_count);
2096 /* this should happen rarely and is pretty bad, it makes the
2097 * pending list not follow the dirty order */
2098 client_obd_list_lock(&cli->cl_loi_list_lock);
2099 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2100 list_del_init(&oap->oap_rpc_item);
2102 /* queued sync pages can be torn down while the pages
2103 * were between the pending list and the rpc */
2104 if (oap->oap_interrupted) {
2105 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2106 osc_ap_completion(env, cli, NULL, oap, 0,
2110 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2112 if (clerq && !IS_ERR(clerq))
2113 cl_req_completion(env, clerq, PTR_ERR(req));
2119 * prepare pages for ASYNC io and put pages in send queue.
2123 * \param cmd - OBD_BRW_* macroses
2124 * \param lop - pending pages
2126 * \return zero if pages successfully add to send queue.
2127 * \return not zere if error occurring.
2130 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2131 struct lov_oinfo *loi,
2132 int cmd, struct loi_oap_pages *lop)
2134 struct ptlrpc_request *req;
2135 obd_count page_count = 0;
2136 struct osc_async_page *oap = NULL, *tmp;
2137 struct osc_brw_async_args *aa;
2138 const struct obd_async_page_ops *ops;
2139 CFS_LIST_HEAD(rpc_list);
2140 unsigned int ending_offset;
2141 unsigned starting_offset = 0;
2143 struct cl_object *clob = NULL;
2146 /* first we find the pages we're allowed to work with */
2147 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2149 ops = oap->oap_caller_ops;
2151 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2152 "magic 0x%x\n", oap, oap->oap_magic);
2155 /* pin object in memory, so that completion call-backs
2156 * can be safely called under client_obd_list lock. */
2157 clob = osc_oap2cl_page(oap)->cp_obj;
2158 cl_object_get(clob);
2161 if (page_count != 0 &&
2162 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2163 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2164 " oap %p, page %p, srvlock %u\n",
2165 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2168 /* in llite being 'ready' equates to the page being locked
2169 * until completion unlocks it. commit_write submits a page
2170 * as not ready because its unlock will happen unconditionally
2171 * as the call returns. if we race with commit_write giving
2172 * us that page we dont' want to create a hole in the page
2173 * stream, so we stop and leave the rpc to be fired by
2174 * another dirtier or kupdated interval (the not ready page
2175 * will still be on the dirty list). we could call in
2176 * at the end of ll_file_write to process the queue again. */
2177 if (!(oap->oap_async_flags & ASYNC_READY)) {
2178 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2181 CDEBUG(D_INODE, "oap %p page %p returned %d "
2182 "instead of ready\n", oap,
2186 /* llite is telling us that the page is still
2187 * in commit_write and that we should try
2188 * and put it in an rpc again later. we
2189 * break out of the loop so we don't create
2190 * a hole in the sequence of pages in the rpc
2195 /* the io isn't needed.. tell the checks
2196 * below to complete the rpc with EINTR */
2197 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2198 oap->oap_count = -EINTR;
2201 oap->oap_async_flags |= ASYNC_READY;
2204 LASSERTF(0, "oap %p page %p returned %d "
2205 "from make_ready\n", oap,
2213 * Page submitted for IO has to be locked. Either by
2214 * ->ap_make_ready() or by higher layers.
2216 #if defined(__KERNEL__) && defined(__linux__)
2218 struct cl_page *page;
2220 page = osc_oap2cl_page(oap);
2222 if (page->cp_type == CPT_CACHEABLE &&
2223 !(PageLocked(oap->oap_page) &&
2224 (CheckWriteback(oap->oap_page, cmd)))) {
2225 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2227 (long)oap->oap_page->flags,
2228 oap->oap_async_flags);
2233 /* If there is a gap at the start of this page, it can't merge
2234 * with any previous page, so we'll hand the network a
2235 * "fragmented" page array that it can't transfer in 1 RDMA */
2236 if (page_count != 0 && oap->oap_page_off != 0)
2239 /* take the page out of our book-keeping */
2240 list_del_init(&oap->oap_pending_item);
2241 lop_update_pending(cli, lop, cmd, -1);
2242 list_del_init(&oap->oap_urgent_item);
2244 if (page_count == 0)
2245 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2246 (PTLRPC_MAX_BRW_SIZE - 1);
2248 /* ask the caller for the size of the io as the rpc leaves. */
2249 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2251 ops->ap_refresh_count(env, oap->oap_caller_data,
2253 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2255 if (oap->oap_count <= 0) {
2256 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2258 osc_ap_completion(env, cli, NULL,
2259 oap, 0, oap->oap_count);
2263 /* now put the page back in our accounting */
2264 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2265 if (page_count == 0)
2266 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2267 if (++page_count >= cli->cl_max_pages_per_rpc)
2270 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2271 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2272 * have the same alignment as the initial writes that allocated
2273 * extents on the server. */
2274 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2275 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2276 if (ending_offset == 0)
2279 /* If there is a gap at the end of this page, it can't merge
2280 * with any subsequent pages, so we'll hand the network a
2281 * "fragmented" page array that it can't transfer in 1 RDMA */
2282 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2286 osc_wake_cache_waiters(cli);
2288 loi_list_maint(cli, loi);
2290 client_obd_list_unlock(&cli->cl_loi_list_lock);
2293 cl_object_put(env, clob);
2295 if (page_count == 0) {
2296 client_obd_list_lock(&cli->cl_loi_list_lock);
2300 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2302 LASSERT(list_empty(&rpc_list));
2303 loi_list_maint(cli, loi);
2304 RETURN(PTR_ERR(req));
2307 aa = ptlrpc_req_async_args(req);
2309 if (cmd == OBD_BRW_READ) {
2310 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2311 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2312 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2313 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2315 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2316 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2317 cli->cl_w_in_flight);
2318 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2319 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2321 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2323 client_obd_list_lock(&cli->cl_loi_list_lock);
2325 if (cmd == OBD_BRW_READ)
2326 cli->cl_r_in_flight++;
2328 cli->cl_w_in_flight++;
2330 /* queued sync pages can be torn down while the pages
2331 * were between the pending list and the rpc */
2333 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2334 /* only one oap gets a request reference */
2337 if (oap->oap_interrupted && !req->rq_intr) {
2338 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2340 ptlrpc_mark_interrupted(req);
2344 tmp->oap_request = ptlrpc_request_addref(req);
2346 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2347 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2349 req->rq_interpret_reply = brw_interpret;
2350 ptlrpcd_add_req(req, PSCOPE_BRW);
2354 #define LOI_DEBUG(LOI, STR, args...) \
2355 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2356 !list_empty(&(LOI)->loi_cli_item), \
2357 (LOI)->loi_write_lop.lop_num_pending, \
2358 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2359 (LOI)->loi_read_lop.lop_num_pending, \
2360 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2363 /* This is called by osc_check_rpcs() to find which objects have pages that
2364 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2365 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2368 /* first return all objects which we already know to have
2369 * pages ready to be stuffed into rpcs */
2370 if (!list_empty(&cli->cl_loi_ready_list))
2371 RETURN(list_entry(cli->cl_loi_ready_list.next,
2372 struct lov_oinfo, loi_cli_item));
2374 /* then if we have cache waiters, return all objects with queued
2375 * writes. This is especially important when many small files
2376 * have filled up the cache and not been fired into rpcs because
2377 * they don't pass the nr_pending/object threshhold */
2378 if (!list_empty(&cli->cl_cache_waiters) &&
2379 !list_empty(&cli->cl_loi_write_list))
2380 RETURN(list_entry(cli->cl_loi_write_list.next,
2381 struct lov_oinfo, loi_write_item));
2383 /* then return all queued objects when we have an invalid import
2384 * so that they get flushed */
2385 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2386 if (!list_empty(&cli->cl_loi_write_list))
2387 RETURN(list_entry(cli->cl_loi_write_list.next,
2388 struct lov_oinfo, loi_write_item));
2389 if (!list_empty(&cli->cl_loi_read_list))
2390 RETURN(list_entry(cli->cl_loi_read_list.next,
2391 struct lov_oinfo, loi_read_item));
2396 /* called with the loi list lock held */
2397 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2399 struct lov_oinfo *loi;
2400 int rc = 0, race_counter = 0;
2403 while ((loi = osc_next_loi(cli)) != NULL) {
2404 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2406 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2409 /* attempt some read/write balancing by alternating between
2410 * reads and writes in an object. The makes_rpc checks here
2411 * would be redundant if we were getting read/write work items
2412 * instead of objects. we don't want send_oap_rpc to drain a
2413 * partial read pending queue when we're given this object to
2414 * do io on writes while there are cache waiters */
2415 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2416 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2417 &loi->loi_write_lop);
2425 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2426 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2427 &loi->loi_read_lop);
2436 /* attempt some inter-object balancing by issueing rpcs
2437 * for each object in turn */
2438 if (!list_empty(&loi->loi_cli_item))
2439 list_del_init(&loi->loi_cli_item);
2440 if (!list_empty(&loi->loi_write_item))
2441 list_del_init(&loi->loi_write_item);
2442 if (!list_empty(&loi->loi_read_item))
2443 list_del_init(&loi->loi_read_item);
2445 loi_list_maint(cli, loi);
2447 /* send_oap_rpc fails with 0 when make_ready tells it to
2448 * back off. llite's make_ready does this when it tries
2449 * to lock a page queued for write that is already locked.
2450 * we want to try sending rpcs from many objects, but we
2451 * don't want to spin failing with 0. */
2452 if (race_counter == 10)
2458 /* we're trying to queue a page in the osc so we're subject to the
2459 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2460 * If the osc's queued pages are already at that limit, then we want to sleep
2461 * until there is space in the osc's queue for us. We also may be waiting for
2462 * write credits from the OST if there are RPCs in flight that may return some
2463 * before we fall back to sync writes.
2465 * We need this know our allocation was granted in the presence of signals */
2466 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2470 client_obd_list_lock(&cli->cl_loi_list_lock);
2471 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2472 client_obd_list_unlock(&cli->cl_loi_list_lock);
2477 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2480 int osc_enter_cache_try(const struct lu_env *env,
2481 struct client_obd *cli, struct lov_oinfo *loi,
2482 struct osc_async_page *oap, int transient)
2486 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2488 osc_consume_write_grant(cli, &oap->oap_brw_page);
2490 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2491 atomic_inc(&obd_dirty_transit_pages);
2492 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2498 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2499 * grant or cache space. */
2500 static int osc_enter_cache(const struct lu_env *env,
2501 struct client_obd *cli, struct lov_oinfo *loi,
2502 struct osc_async_page *oap)
2504 struct osc_cache_waiter ocw;
2505 struct l_wait_info lwi = { 0 };
2509 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2510 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2511 cli->cl_dirty_max, obd_max_dirty_pages,
2512 cli->cl_lost_grant, cli->cl_avail_grant);
2514 /* force the caller to try sync io. this can jump the list
2515 * of queued writes and create a discontiguous rpc stream */
2516 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2517 loi->loi_ar.ar_force_sync)
2520 /* Hopefully normal case - cache space and write credits available */
2521 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2522 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2523 osc_enter_cache_try(env, cli, loi, oap, 0))
2526 /* Make sure that there are write rpcs in flight to wait for. This
2527 * is a little silly as this object may not have any pending but
2528 * other objects sure might. */
2529 if (cli->cl_w_in_flight) {
2530 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2531 cfs_waitq_init(&ocw.ocw_waitq);
2535 loi_list_maint(cli, loi);
2536 osc_check_rpcs(env, cli);
2537 client_obd_list_unlock(&cli->cl_loi_list_lock);
2539 CDEBUG(D_CACHE, "sleeping for cache space\n");
2540 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2542 client_obd_list_lock(&cli->cl_loi_list_lock);
2543 if (!list_empty(&ocw.ocw_entry)) {
2544 list_del(&ocw.ocw_entry);
2554 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2555 struct lov_oinfo *loi, cfs_page_t *page,
2556 obd_off offset, const struct obd_async_page_ops *ops,
2557 void *data, void **res, int nocache,
2558 struct lustre_handle *lockh)
2560 struct osc_async_page *oap;
2565 return size_round(sizeof(*oap));
2568 oap->oap_magic = OAP_MAGIC;
2569 oap->oap_cli = &exp->exp_obd->u.cli;
2572 oap->oap_caller_ops = ops;
2573 oap->oap_caller_data = data;
2575 oap->oap_page = page;
2576 oap->oap_obj_off = offset;
2577 if (!client_is_remote(exp) &&
2578 cfs_capable(CFS_CAP_SYS_RESOURCE))
2579 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2581 LASSERT(!(offset & ~CFS_PAGE_MASK));
2583 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2584 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2585 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2586 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2588 spin_lock_init(&oap->oap_lock);
2589 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2593 struct osc_async_page *oap_from_cookie(void *cookie)
2595 struct osc_async_page *oap = cookie;
2596 if (oap->oap_magic != OAP_MAGIC)
2597 return ERR_PTR(-EINVAL);
2601 int osc_queue_async_io(const struct lu_env *env,
2602 struct obd_export *exp, struct lov_stripe_md *lsm,
2603 struct lov_oinfo *loi, void *cookie,
2604 int cmd, obd_off off, int count,
2605 obd_flag brw_flags, enum async_flags async_flags)
2607 struct client_obd *cli = &exp->exp_obd->u.cli;
2608 struct osc_async_page *oap;
2612 oap = oap_from_cookie(cookie);
2614 RETURN(PTR_ERR(oap));
2616 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2619 if (!list_empty(&oap->oap_pending_item) ||
2620 !list_empty(&oap->oap_urgent_item) ||
2621 !list_empty(&oap->oap_rpc_item))
2624 /* check if the file's owner/group is over quota */
2625 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2626 struct cl_object *obj;
2627 struct cl_attr attr; /* XXX put attr into thread info */
2629 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2631 cl_object_attr_lock(obj);
2632 rc = cl_object_attr_get(env, obj, &attr);
2633 cl_object_attr_unlock(obj);
2635 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2636 attr.cat_gid) == NO_QUOTA)
2643 loi = lsm->lsm_oinfo[0];
2645 client_obd_list_lock(&cli->cl_loi_list_lock);
2647 LASSERT(off + count <= CFS_PAGE_SIZE);
2649 oap->oap_page_off = off;
2650 oap->oap_count = count;
2651 oap->oap_brw_flags = brw_flags;
2652 oap->oap_async_flags = async_flags;
2654 if (cmd & OBD_BRW_WRITE) {
2655 rc = osc_enter_cache(env, cli, loi, oap);
2657 client_obd_list_unlock(&cli->cl_loi_list_lock);
2662 osc_oap_to_pending(oap);
2663 loi_list_maint(cli, loi);
2665 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2668 osc_check_rpcs(env, cli);
2669 client_obd_list_unlock(&cli->cl_loi_list_lock);
2674 /* aka (~was & now & flag), but this is more clear :) */
2675 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2677 int osc_set_async_flags_base(struct client_obd *cli,
2678 struct lov_oinfo *loi, struct osc_async_page *oap,
2679 obd_flag async_flags)
2681 struct loi_oap_pages *lop;
2684 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2687 if (oap->oap_cmd & OBD_BRW_WRITE) {
2688 lop = &loi->loi_write_lop;
2690 lop = &loi->loi_read_lop;
2693 if (list_empty(&oap->oap_pending_item))
2696 if ((oap->oap_async_flags & async_flags) == async_flags)
2699 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2700 oap->oap_async_flags |= ASYNC_READY;
2702 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2703 if (list_empty(&oap->oap_rpc_item)) {
2704 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2705 loi_list_maint(cli, loi);
2709 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2710 oap->oap_async_flags);
2714 int osc_teardown_async_page(struct obd_export *exp,
2715 struct lov_stripe_md *lsm,
2716 struct lov_oinfo *loi, void *cookie)
2718 struct client_obd *cli = &exp->exp_obd->u.cli;
2719 struct loi_oap_pages *lop;
2720 struct osc_async_page *oap;
2724 oap = oap_from_cookie(cookie);
2726 RETURN(PTR_ERR(oap));
2729 loi = lsm->lsm_oinfo[0];
2731 if (oap->oap_cmd & OBD_BRW_WRITE) {
2732 lop = &loi->loi_write_lop;
2734 lop = &loi->loi_read_lop;
2737 client_obd_list_lock(&cli->cl_loi_list_lock);
2739 if (!list_empty(&oap->oap_rpc_item))
2740 GOTO(out, rc = -EBUSY);
2742 osc_exit_cache(cli, oap, 0);
2743 osc_wake_cache_waiters(cli);
2745 if (!list_empty(&oap->oap_urgent_item)) {
2746 list_del_init(&oap->oap_urgent_item);
2747 oap->oap_async_flags &= ~ASYNC_URGENT;
2749 if (!list_empty(&oap->oap_pending_item)) {
2750 list_del_init(&oap->oap_pending_item);
2751 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2753 loi_list_maint(cli, loi);
2754 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2756 client_obd_list_unlock(&cli->cl_loi_list_lock);
2760 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2761 struct ldlm_enqueue_info *einfo,
2764 void *data = einfo->ei_cbdata;
2766 LASSERT(lock != NULL);
2767 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2768 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2769 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2770 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2772 lock_res_and_lock(lock);
2773 spin_lock(&osc_ast_guard);
2774 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2775 lock->l_ast_data = data;
2776 spin_unlock(&osc_ast_guard);
2777 unlock_res_and_lock(lock);
2780 static void osc_set_data_with_check(struct lustre_handle *lockh,
2781 struct ldlm_enqueue_info *einfo,
2784 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2787 osc_set_lock_data_with_check(lock, einfo, flags);
2788 LDLM_LOCK_PUT(lock);
2790 CERROR("lockh %p, data %p - client evicted?\n",
2791 lockh, einfo->ei_cbdata);
2794 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2795 ldlm_iterator_t replace, void *data)
2797 struct ldlm_res_id res_id;
2798 struct obd_device *obd = class_exp2obd(exp);
2800 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2801 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2805 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2806 obd_enqueue_update_f upcall, void *cookie,
2809 int intent = *flags & LDLM_FL_HAS_INTENT;
2813 /* The request was created before ldlm_cli_enqueue call. */
2814 if (rc == ELDLM_LOCK_ABORTED) {
2815 struct ldlm_reply *rep;
2816 rep = req_capsule_server_get(&req->rq_pill,
2819 LASSERT(rep != NULL);
2820 if (rep->lock_policy_res1)
2821 rc = rep->lock_policy_res1;
2825 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2826 *flags |= LDLM_FL_LVB_READY;
2827 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2828 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2831 /* Call the update callback. */
2832 rc = (*upcall)(cookie, rc);
2836 static int osc_enqueue_interpret(const struct lu_env *env,
2837 struct ptlrpc_request *req,
2838 struct osc_enqueue_args *aa, int rc)
2840 struct ldlm_lock *lock;
2841 struct lustre_handle handle;
2844 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2845 * might be freed anytime after lock upcall has been called. */
2846 lustre_handle_copy(&handle, aa->oa_lockh);
2847 mode = aa->oa_ei->ei_mode;
2849 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2851 lock = ldlm_handle2lock(&handle);
2853 /* Take an additional reference so that a blocking AST that
2854 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2855 * to arrive after an upcall has been executed by
2856 * osc_enqueue_fini(). */
2857 ldlm_lock_addref(&handle, mode);
2859 /* Complete obtaining the lock procedure. */
2860 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2861 mode, aa->oa_flags, aa->oa_lvb,
2862 sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2864 /* Complete osc stuff. */
2865 rc = osc_enqueue_fini(req, aa->oa_lvb,
2866 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2867 /* Release the lock for async request. */
2868 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2870 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2871 * not already released by
2872 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2874 ldlm_lock_decref(&handle, mode);
2876 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2877 aa->oa_lockh, req, aa);
2878 ldlm_lock_decref(&handle, mode);
2879 LDLM_LOCK_PUT(lock);
2883 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2884 struct lov_oinfo *loi, int flags,
2885 struct ost_lvb *lvb, __u32 mode, int rc)
2887 if (rc == ELDLM_OK) {
2888 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2891 LASSERT(lock != NULL);
2892 loi->loi_lvb = *lvb;
2893 tmp = loi->loi_lvb.lvb_size;
2894 /* Extend KMS up to the end of this lock and no further
2895 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2896 if (tmp > lock->l_policy_data.l_extent.end)
2897 tmp = lock->l_policy_data.l_extent.end + 1;
2898 if (tmp >= loi->loi_kms) {
2899 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2900 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2901 loi_kms_set(loi, tmp);
2903 LDLM_DEBUG(lock, "lock acquired, setting rss="
2904 LPU64"; leaving kms="LPU64", end="LPU64,
2905 loi->loi_lvb.lvb_size, loi->loi_kms,
2906 lock->l_policy_data.l_extent.end);
2908 ldlm_lock_allow_match(lock);
2909 LDLM_LOCK_PUT(lock);
2910 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2911 loi->loi_lvb = *lvb;
2912 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2913 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2917 EXPORT_SYMBOL(osc_update_enqueue);
2919 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2921 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2922 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2923 * other synchronous requests, however keeping some locks and trying to obtain
2924 * others may take a considerable amount of time in a case of ost failure; and
2925 * when other sync requests do not get released lock from a client, the client
2926 * is excluded from the cluster -- such scenarious make the life difficult, so
2927 * release locks just after they are obtained. */
2928 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2929 int *flags, ldlm_policy_data_t *policy,
2930 struct ost_lvb *lvb, int kms_valid,
2931 obd_enqueue_update_f upcall, void *cookie,
2932 struct ldlm_enqueue_info *einfo,
2933 struct lustre_handle *lockh,
2934 struct ptlrpc_request_set *rqset, int async)
2936 struct obd_device *obd = exp->exp_obd;
2937 struct ptlrpc_request *req = NULL;
2938 int intent = *flags & LDLM_FL_HAS_INTENT;
2943 /* Filesystem lock extents are extended to page boundaries so that
2944 * dealing with the page cache is a little smoother. */
2945 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2946 policy->l_extent.end |= ~CFS_PAGE_MASK;
2949 * kms is not valid when either object is completely fresh (so that no
2950 * locks are cached), or object was evicted. In the latter case cached
2951 * lock cannot be used, because it would prime inode state with
2952 * potentially stale LVB.
2957 /* Next, search for already existing extent locks that will cover us */
2958 /* If we're trying to read, we also search for an existing PW lock. The
2959 * VFS and page cache already protect us locally, so lots of readers/
2960 * writers can share a single PW lock.
2962 * There are problems with conversion deadlocks, so instead of
2963 * converting a read lock to a write lock, we'll just enqueue a new
2966 * At some point we should cancel the read lock instead of making them
2967 * send us a blocking callback, but there are problems with canceling
2968 * locks out from other users right now, too. */
2969 mode = einfo->ei_mode;
2970 if (einfo->ei_mode == LCK_PR)
2972 mode = ldlm_lock_match(obd->obd_namespace,
2973 *flags | LDLM_FL_LVB_READY, res_id,
2974 einfo->ei_type, policy, mode, lockh, 0);
2976 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2978 if (matched->l_ast_data == NULL ||
2979 matched->l_ast_data == einfo->ei_cbdata) {
2980 /* addref the lock only if not async requests and PW
2981 * lock is matched whereas we asked for PR. */
2982 if (!rqset && einfo->ei_mode != mode)
2983 ldlm_lock_addref(lockh, LCK_PR);
2984 osc_set_lock_data_with_check(matched, einfo, *flags);
2986 /* I would like to be able to ASSERT here that
2987 * rss <= kms, but I can't, for reasons which
2988 * are explained in lov_enqueue() */
2991 /* We already have a lock, and it's referenced */
2992 (*upcall)(cookie, ELDLM_OK);
2994 /* For async requests, decref the lock. */
2995 if (einfo->ei_mode != mode)
2996 ldlm_lock_decref(lockh, LCK_PW);
2998 ldlm_lock_decref(lockh, einfo->ei_mode);
2999 LDLM_LOCK_PUT(matched);
3002 ldlm_lock_decref(lockh, mode);
3003 LDLM_LOCK_PUT(matched);
3008 CFS_LIST_HEAD(cancels);
3009 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3010 &RQF_LDLM_ENQUEUE_LVB);
3014 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3018 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3020 ptlrpc_request_set_replen(req);
3023 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3024 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3026 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3027 sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3030 struct osc_enqueue_args *aa;
3031 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3032 aa = ptlrpc_req_async_args(req);
3035 aa->oa_flags = flags;
3036 aa->oa_upcall = upcall;
3037 aa->oa_cookie = cookie;
3039 aa->oa_lockh = lockh;
3041 req->rq_interpret_reply =
3042 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3043 if (rqset == PTLRPCD_SET)
3044 ptlrpcd_add_req(req, PSCOPE_OTHER);
3046 ptlrpc_set_add_req(rqset, req);
3047 } else if (intent) {
3048 ptlrpc_req_finished(req);
3053 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3055 ptlrpc_req_finished(req);
3060 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3061 struct ldlm_enqueue_info *einfo,
3062 struct ptlrpc_request_set *rqset)
3064 struct ldlm_res_id res_id;
3068 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3069 oinfo->oi_md->lsm_object_gr, &res_id);
3071 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3072 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3073 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3074 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3075 rqset, rqset != NULL);
3079 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3080 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3081 int *flags, void *data, struct lustre_handle *lockh,
3084 struct obd_device *obd = exp->exp_obd;
3085 int lflags = *flags;
3089 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3092 /* Filesystem lock extents are extended to page boundaries so that
3093 * dealing with the page cache is a little smoother */
3094 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3095 policy->l_extent.end |= ~CFS_PAGE_MASK;
3097 /* Next, search for already existing extent locks that will cover us */
3098 /* If we're trying to read, we also search for an existing PW lock. The
3099 * VFS and page cache already protect us locally, so lots of readers/
3100 * writers can share a single PW lock. */
3104 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3105 res_id, type, policy, rc, lockh, unref);
3108 osc_set_data_with_check(lockh, data, lflags);
3109 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3110 ldlm_lock_addref(lockh, LCK_PR);
3111 ldlm_lock_decref(lockh, LCK_PW);
3118 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3122 if (unlikely(mode == LCK_GROUP))
3123 ldlm_lock_decref_and_cancel(lockh, mode);
3125 ldlm_lock_decref(lockh, mode);
3130 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3131 __u32 mode, struct lustre_handle *lockh)
3134 RETURN(osc_cancel_base(lockh, mode));
3137 static int osc_cancel_unused(struct obd_export *exp,
3138 struct lov_stripe_md *lsm, int flags,
3141 struct obd_device *obd = class_exp2obd(exp);
3142 struct ldlm_res_id res_id, *resp = NULL;
3145 resp = osc_build_res_name(lsm->lsm_object_id,
3146 lsm->lsm_object_gr, &res_id);
3149 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3152 static int osc_statfs_interpret(const struct lu_env *env,
3153 struct ptlrpc_request *req,
3154 struct osc_async_args *aa, int rc)
3156 struct obd_statfs *msfs;
3162 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3164 GOTO(out, rc = -EPROTO);
3167 *aa->aa_oi->oi_osfs = *msfs;
3169 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3173 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3174 __u64 max_age, struct ptlrpc_request_set *rqset)
3176 struct ptlrpc_request *req;
3177 struct osc_async_args *aa;
3181 /* We could possibly pass max_age in the request (as an absolute
3182 * timestamp or a "seconds.usec ago") so the target can avoid doing
3183 * extra calls into the filesystem if that isn't necessary (e.g.
3184 * during mount that would help a bit). Having relative timestamps
3185 * is not so great if request processing is slow, while absolute
3186 * timestamps are not ideal because they need time synchronization. */
3187 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3191 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3193 ptlrpc_request_free(req);
3196 ptlrpc_request_set_replen(req);
3197 req->rq_request_portal = OST_CREATE_PORTAL;
3198 ptlrpc_at_set_req_timeout(req);
3200 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3201 /* procfs requests not want stat in wait for avoid deadlock */
3202 req->rq_no_resend = 1;
3203 req->rq_no_delay = 1;
3206 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3207 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3208 aa = ptlrpc_req_async_args(req);
3211 ptlrpc_set_add_req(rqset, req);
3215 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3216 __u64 max_age, __u32 flags)
3218 struct obd_statfs *msfs;
3219 struct ptlrpc_request *req;
3220 struct obd_import *imp = NULL;
3224 /*Since the request might also come from lprocfs, so we need
3225 *sync this with client_disconnect_export Bug15684*/
3226 down_read(&obd->u.cli.cl_sem);
3227 if (obd->u.cli.cl_import)
3228 imp = class_import_get(obd->u.cli.cl_import);
3229 up_read(&obd->u.cli.cl_sem);
3233 /* We could possibly pass max_age in the request (as an absolute
3234 * timestamp or a "seconds.usec ago") so the target can avoid doing
3235 * extra calls into the filesystem if that isn't necessary (e.g.
3236 * during mount that would help a bit). Having relative timestamps
3237 * is not so great if request processing is slow, while absolute
3238 * timestamps are not ideal because they need time synchronization. */
3239 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3241 class_import_put(imp);
3246 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3248 ptlrpc_request_free(req);
3251 ptlrpc_request_set_replen(req);
3252 req->rq_request_portal = OST_CREATE_PORTAL;
3253 ptlrpc_at_set_req_timeout(req);
3255 if (flags & OBD_STATFS_NODELAY) {
3256 /* procfs requests not want stat in wait for avoid deadlock */
3257 req->rq_no_resend = 1;
3258 req->rq_no_delay = 1;
3261 rc = ptlrpc_queue_wait(req);
3265 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3267 GOTO(out, rc = -EPROTO);
3274 ptlrpc_req_finished(req);
3278 /* Retrieve object striping information.
3280 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3281 * the maximum number of OST indices which will fit in the user buffer.
3282 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3284 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3286 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3287 struct lov_user_md_v3 lum, *lumk;
3288 struct lov_user_ost_data_v1 *lmm_objects;
3289 int rc = 0, lum_size;
3295 /* we only need the header part from user space to get lmm_magic and
3296 * lmm_stripe_count, (the header part is common to v1 and v3) */
3297 lum_size = sizeof(struct lov_user_md_v1);
3298 if (copy_from_user(&lum, lump, lum_size))
3301 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3302 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3305 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3306 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3307 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3308 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3310 /* we can use lov_mds_md_size() to compute lum_size
3311 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3312 if (lum.lmm_stripe_count > 0) {
3313 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3314 OBD_ALLOC(lumk, lum_size);
3318 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3319 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3321 lmm_objects = &(lumk->lmm_objects[0]);
3322 lmm_objects->l_object_id = lsm->lsm_object_id;
3324 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3328 lumk->lmm_object_id = lsm->lsm_object_id;
3329 lumk->lmm_object_gr = lsm->lsm_object_gr;
3330 lumk->lmm_stripe_count = 1;
3332 if (copy_to_user(lump, lumk, lum_size))
3336 OBD_FREE(lumk, lum_size);
3342 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3343 void *karg, void *uarg)
3345 struct obd_device *obd = exp->exp_obd;
3346 struct obd_ioctl_data *data = karg;
3350 if (!try_module_get(THIS_MODULE)) {
3351 CERROR("Can't get module. Is it alive?");
3355 case OBD_IOC_LOV_GET_CONFIG: {
3357 struct lov_desc *desc;
3358 struct obd_uuid uuid;
3362 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3363 GOTO(out, err = -EINVAL);
3365 data = (struct obd_ioctl_data *)buf;
3367 if (sizeof(*desc) > data->ioc_inllen1) {
3368 obd_ioctl_freedata(buf, len);
3369 GOTO(out, err = -EINVAL);
3372 if (data->ioc_inllen2 < sizeof(uuid)) {
3373 obd_ioctl_freedata(buf, len);
3374 GOTO(out, err = -EINVAL);
3377 desc = (struct lov_desc *)data->ioc_inlbuf1;
3378 desc->ld_tgt_count = 1;
3379 desc->ld_active_tgt_count = 1;
3380 desc->ld_default_stripe_count = 1;
3381 desc->ld_default_stripe_size = 0;
3382 desc->ld_default_stripe_offset = 0;
3383 desc->ld_pattern = 0;
3384 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3386 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3388 err = copy_to_user((void *)uarg, buf, len);
3391 obd_ioctl_freedata(buf, len);
3394 case LL_IOC_LOV_SETSTRIPE:
3395 err = obd_alloc_memmd(exp, karg);
3399 case LL_IOC_LOV_GETSTRIPE:
3400 err = osc_getstripe(karg, uarg);
3402 case OBD_IOC_CLIENT_RECOVER:
3403 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3408 case IOC_OSC_SET_ACTIVE:
3409 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3412 case OBD_IOC_POLL_QUOTACHECK:
3413 err = lquota_poll_check(quota_interface, exp,
3414 (struct if_quotacheck *)karg);
3416 case OBD_IOC_PING_TARGET:
3417 err = ptlrpc_obd_ping(obd);
3420 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3421 cmd, cfs_curproc_comm());
3422 GOTO(out, err = -ENOTTY);
3425 module_put(THIS_MODULE);
3429 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3430 void *key, __u32 *vallen, void *val,
3431 struct lov_stripe_md *lsm)
3434 if (!vallen || !val)
3437 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3438 __u32 *stripe = val;
3439 *vallen = sizeof(*stripe);
3442 } else if (KEY_IS(KEY_LAST_ID)) {
3443 struct ptlrpc_request *req;
3448 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3449 &RQF_OST_GET_INFO_LAST_ID);
3453 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3454 RCL_CLIENT, keylen);
3455 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3457 ptlrpc_request_free(req);
3461 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3462 memcpy(tmp, key, keylen);
3464 ptlrpc_request_set_replen(req);
3465 rc = ptlrpc_queue_wait(req);
3469 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3471 GOTO(out, rc = -EPROTO);
3473 *((obd_id *)val) = *reply;
3475 ptlrpc_req_finished(req);
3477 } else if (KEY_IS(KEY_FIEMAP)) {
3478 struct ptlrpc_request *req;
3479 struct ll_user_fiemap *reply;
3483 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3484 &RQF_OST_GET_INFO_FIEMAP);
3488 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3489 RCL_CLIENT, keylen);
3490 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3491 RCL_CLIENT, *vallen);
3492 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3493 RCL_SERVER, *vallen);
3495 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3497 ptlrpc_request_free(req);
3501 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3502 memcpy(tmp, key, keylen);
3503 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3504 memcpy(tmp, val, *vallen);
3506 ptlrpc_request_set_replen(req);
3507 rc = ptlrpc_queue_wait(req);
3511 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3513 GOTO(out1, rc = -EPROTO);
3515 memcpy(val, reply, *vallen);
3517 ptlrpc_req_finished(req);
3525 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3526 struct ptlrpc_request *req,
3529 struct llog_ctxt *ctxt;
3530 struct obd_import *imp = req->rq_import;
3536 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3539 rc = llog_initiator_connect(ctxt);
3541 CERROR("cannot establish connection for "
3542 "ctxt %p: %d\n", ctxt, rc);
3545 llog_ctxt_put(ctxt);
3546 spin_lock(&imp->imp_lock);
3547 imp->imp_server_timeout = 1;
3548 imp->imp_pingable = 1;
3549 spin_unlock(&imp->imp_lock);
3550 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3555 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3556 void *key, obd_count vallen, void *val,
3557 struct ptlrpc_request_set *set)
3559 struct ptlrpc_request *req;
3560 struct obd_device *obd = exp->exp_obd;
3561 struct obd_import *imp = class_exp2cliimp(exp);
3566 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3568 if (KEY_IS(KEY_NEXT_ID)) {
3569 if (vallen != sizeof(obd_id))
3573 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3574 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3575 exp->exp_obd->obd_name,
3576 obd->u.cli.cl_oscc.oscc_next_id);
3581 if (KEY_IS(KEY_UNLINKED)) {
3582 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3583 spin_lock(&oscc->oscc_lock);
3584 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3585 spin_unlock(&oscc->oscc_lock);
3589 if (KEY_IS(KEY_INIT_RECOV)) {
3590 if (vallen != sizeof(int))
3592 spin_lock(&imp->imp_lock);
3593 imp->imp_initial_recov = *(int *)val;
3594 spin_unlock(&imp->imp_lock);
3595 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3596 exp->exp_obd->obd_name,
3597 imp->imp_initial_recov);
3601 if (KEY_IS(KEY_CHECKSUM)) {
3602 if (vallen != sizeof(int))
3604 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3608 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3609 sptlrpc_conf_client_adapt(obd);
3613 if (KEY_IS(KEY_FLUSH_CTX)) {
3614 sptlrpc_import_flush_my_ctx(imp);
3621 /* We pass all other commands directly to OST. Since nobody calls osc
3622 methods directly and everybody is supposed to go through LOV, we
3623 assume lov checked invalid values for us.
3624 The only recognised values so far are evict_by_nid and mds_conn.
3625 Even if something bad goes through, we'd get a -EINVAL from OST
3629 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3633 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3634 RCL_CLIENT, keylen);
3635 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3636 RCL_CLIENT, vallen);
3637 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3639 ptlrpc_request_free(req);
3643 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3644 memcpy(tmp, key, keylen);
3645 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3646 memcpy(tmp, val, vallen);
3648 if (KEY_IS(KEY_MDS_CONN)) {
3649 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3651 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3652 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3653 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3654 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3657 ptlrpc_request_set_replen(req);
3658 ptlrpc_set_add_req(set, req);
3659 ptlrpc_check_set(NULL, set);
3665 static struct llog_operations osc_size_repl_logops = {
3666 lop_cancel: llog_obd_repl_cancel
3669 static struct llog_operations osc_mds_ost_orig_logops;
3670 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3671 struct obd_device *tgt, int count,
3672 struct llog_catid *catid, struct obd_uuid *uuid)
3677 LASSERT(olg == &obd->obd_olg);
3678 spin_lock(&obd->obd_dev_lock);
3679 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3680 osc_mds_ost_orig_logops = llog_lvfs_ops;
3681 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3682 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3683 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3684 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3686 spin_unlock(&obd->obd_dev_lock);
3688 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3689 &catid->lci_logid, &osc_mds_ost_orig_logops);
3691 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3695 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3696 NULL, &osc_size_repl_logops);
3698 struct llog_ctxt *ctxt =
3699 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3702 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3707 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3708 obd->obd_name, tgt->obd_name, count, catid, rc);
3709 CERROR("logid "LPX64":0x%x\n",
3710 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3715 static int osc_llog_finish(struct obd_device *obd, int count)
3717 struct llog_ctxt *ctxt;
3718 int rc = 0, rc2 = 0;
3721 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3723 rc = llog_cleanup(ctxt);
3725 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3727 rc2 = llog_cleanup(ctxt);
3734 static int osc_reconnect(const struct lu_env *env,
3735 struct obd_export *exp, struct obd_device *obd,
3736 struct obd_uuid *cluuid,
3737 struct obd_connect_data *data,
3740 struct client_obd *cli = &obd->u.cli;
3742 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3745 client_obd_list_lock(&cli->cl_loi_list_lock);
3746 data->ocd_grant = cli->cl_avail_grant ?:
3747 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3748 lost_grant = cli->cl_lost_grant;
3749 cli->cl_lost_grant = 0;
3750 client_obd_list_unlock(&cli->cl_loi_list_lock);
3752 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3753 "cl_lost_grant: %ld\n", data->ocd_grant,
3754 cli->cl_avail_grant, lost_grant);
3755 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3756 " ocd_grant: %d\n", data->ocd_connect_flags,
3757 data->ocd_version, data->ocd_grant);
3763 static int osc_disconnect(struct obd_export *exp)
3765 struct obd_device *obd = class_exp2obd(exp);
3766 struct llog_ctxt *ctxt;
3769 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3771 if (obd->u.cli.cl_conn_count == 1) {
3772 /* Flush any remaining cancel messages out to the
3774 llog_sync(ctxt, exp);
3776 llog_ctxt_put(ctxt);
3778 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3782 rc = client_disconnect_export(exp);
3786 static int osc_import_event(struct obd_device *obd,
3787 struct obd_import *imp,
3788 enum obd_import_event event)
3790 struct client_obd *cli;
3794 LASSERT(imp->imp_obd == obd);
3797 case IMP_EVENT_DISCON: {
3798 /* Only do this on the MDS OSC's */
3799 if (imp->imp_server_timeout) {
3800 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3802 spin_lock(&oscc->oscc_lock);
3803 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3804 spin_unlock(&oscc->oscc_lock);
3807 client_obd_list_lock(&cli->cl_loi_list_lock);
3808 cli->cl_avail_grant = 0;
3809 cli->cl_lost_grant = 0;
3810 client_obd_list_unlock(&cli->cl_loi_list_lock);
3813 case IMP_EVENT_INACTIVE: {
3814 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3817 case IMP_EVENT_INVALIDATE: {
3818 struct ldlm_namespace *ns = obd->obd_namespace;
3822 env = cl_env_get(&refcheck);
3826 client_obd_list_lock(&cli->cl_loi_list_lock);
3827 /* all pages go to failing rpcs due to the invalid
3829 osc_check_rpcs(env, cli);
3830 client_obd_list_unlock(&cli->cl_loi_list_lock);
3832 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3833 cl_env_put(env, &refcheck);
3838 case IMP_EVENT_ACTIVE: {
3839 /* Only do this on the MDS OSC's */
3840 if (imp->imp_server_timeout) {
3841 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3843 spin_lock(&oscc->oscc_lock);
3844 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3845 spin_unlock(&oscc->oscc_lock);
3847 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3850 case IMP_EVENT_OCD: {
3851 struct obd_connect_data *ocd = &imp->imp_connect_data;
3853 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3854 osc_init_grant(&obd->u.cli, ocd);
3857 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3858 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3860 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3864 CERROR("Unknown import event %d\n", event);
3870 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3876 rc = ptlrpcd_addref();
3880 rc = client_obd_setup(obd, lcfg);
3884 struct lprocfs_static_vars lvars = { 0 };
3885 struct client_obd *cli = &obd->u.cli;
3887 lprocfs_osc_init_vars(&lvars);
3888 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3889 lproc_osc_attach_seqstat(obd);
3890 sptlrpc_lprocfs_cliobd_attach(obd);
3891 ptlrpc_lprocfs_register_obd(obd);
3895 /* We need to allocate a few requests more, because
3896 brw_interpret tries to create new requests before freeing
3897 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3898 reserved, but I afraid that might be too much wasted RAM
3899 in fact, so 2 is just my guess and still should work. */
3900 cli->cl_import->imp_rq_pool =
3901 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3903 ptlrpc_add_rqs_to_pool);
3909 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3915 case OBD_CLEANUP_EARLY: {
3916 struct obd_import *imp;
3917 imp = obd->u.cli.cl_import;
3918 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3919 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3920 ptlrpc_deactivate_import(imp);
3921 spin_lock(&imp->imp_lock);
3922 imp->imp_pingable = 0;
3923 spin_unlock(&imp->imp_lock);
3926 case OBD_CLEANUP_EXPORTS: {
3927 /* If we set up but never connected, the
3928 client import will not have been cleaned. */
3929 if (obd->u.cli.cl_import) {
3930 struct obd_import *imp;
3931 down_write(&obd->u.cli.cl_sem);
3932 imp = obd->u.cli.cl_import;
3933 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3935 ptlrpc_invalidate_import(imp);
3936 if (imp->imp_rq_pool) {
3937 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3938 imp->imp_rq_pool = NULL;
3940 class_destroy_import(imp);
3941 up_write(&obd->u.cli.cl_sem);
3942 obd->u.cli.cl_import = NULL;
3944 rc = obd_llog_finish(obd, 0);
3946 CERROR("failed to cleanup llogging subsystems\n");
3953 int osc_cleanup(struct obd_device *obd)
3955 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3959 ptlrpc_lprocfs_unregister_obd(obd);
3960 lprocfs_obd_cleanup(obd);
3962 spin_lock(&oscc->oscc_lock);
3963 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3964 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3965 spin_unlock(&oscc->oscc_lock);
3967 /* free memory of osc quota cache */
3968 lquota_cleanup(quota_interface, obd);
3970 rc = client_obd_cleanup(obd);
3976 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3978 struct lprocfs_static_vars lvars = { 0 };
3981 lprocfs_osc_init_vars(&lvars);
3983 switch (lcfg->lcfg_command) {
3985 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3995 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3997 return osc_process_config_base(obd, buf);
4000 struct obd_ops osc_obd_ops = {
4001 .o_owner = THIS_MODULE,
4002 .o_setup = osc_setup,
4003 .o_precleanup = osc_precleanup,
4004 .o_cleanup = osc_cleanup,
4005 .o_add_conn = client_import_add_conn,
4006 .o_del_conn = client_import_del_conn,
4007 .o_connect = client_connect_import,
4008 .o_reconnect = osc_reconnect,
4009 .o_disconnect = osc_disconnect,
4010 .o_statfs = osc_statfs,
4011 .o_statfs_async = osc_statfs_async,
4012 .o_packmd = osc_packmd,
4013 .o_unpackmd = osc_unpackmd,
4014 .o_precreate = osc_precreate,
4015 .o_create = osc_create,
4016 .o_destroy = osc_destroy,
4017 .o_getattr = osc_getattr,
4018 .o_getattr_async = osc_getattr_async,
4019 .o_setattr = osc_setattr,
4020 .o_setattr_async = osc_setattr_async,
4022 .o_punch = osc_punch,
4024 .o_enqueue = osc_enqueue,
4025 .o_change_cbdata = osc_change_cbdata,
4026 .o_cancel = osc_cancel,
4027 .o_cancel_unused = osc_cancel_unused,
4028 .o_iocontrol = osc_iocontrol,
4029 .o_get_info = osc_get_info,
4030 .o_set_info_async = osc_set_info_async,
4031 .o_import_event = osc_import_event,
4032 .o_llog_init = osc_llog_init,
4033 .o_llog_finish = osc_llog_finish,
4034 .o_process_config = osc_process_config,
4037 extern struct lu_kmem_descr osc_caches[];
4038 extern spinlock_t osc_ast_guard;
4039 extern struct lock_class_key osc_ast_guard_class;
4041 int __init osc_init(void)
4043 struct lprocfs_static_vars lvars = { 0 };
4047 /* print an address of _any_ initialized kernel symbol from this
4048 * module, to allow debugging with gdb that doesn't support data
4049 * symbols from modules.*/
4050 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4052 rc = lu_kmem_init(osc_caches);
4054 lprocfs_osc_init_vars(&lvars);
4056 request_module("lquota");
4057 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4058 lquota_init(quota_interface);
4059 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4061 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4062 LUSTRE_OSC_NAME, &osc_device_type);
4064 if (quota_interface)
4065 PORTAL_SYMBOL_PUT(osc_quota_interface);
4066 lu_kmem_fini(osc_caches);
4070 spin_lock_init(&osc_ast_guard);
4071 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4077 static void /*__exit*/ osc_exit(void)
4079 lu_device_type_fini(&osc_device_type);
4081 lquota_exit(quota_interface);
4082 if (quota_interface)
4083 PORTAL_SYMBOL_PUT(osc_quota_interface);
4085 class_unregister_type(LUSTRE_OSC_NAME);
4086 lu_kmem_fini(osc_caches);
4089 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4090 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4091 MODULE_LICENSE("GPL");
4093 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);