1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 body->oa = *oinfo->oi_oa;
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214 lustre_swab_ost_body);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 *oinfo->oi_oa = body->oa;
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308 struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316 CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317 "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318 oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327 ptlrpc_request_free(req);
331 osc_pack_req_body(req, oinfo);
333 ptlrpc_request_set_replen(req);
335 rc = ptlrpc_queue_wait(req);
339 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EPROTO);
343 *oinfo->oi_oa = body->oa;
347 ptlrpc_req_finished(req);
351 static int osc_setattr_interpret(const struct lu_env *env,
352 struct ptlrpc_request *req,
353 struct osc_async_args *aa, int rc)
355 struct ost_body *body;
361 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363 GOTO(out, rc = -EPROTO);
365 *aa->aa_oi->oi_oa = body->oa;
367 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372 struct obd_trans_info *oti,
373 struct ptlrpc_request_set *rqset)
375 struct ptlrpc_request *req;
376 struct osc_async_args *aa;
380 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387 ptlrpc_request_free(req);
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
400 /* do mds to ost setattr asynchronously */
402 /* Do not wait for response. */
403 ptlrpcd_add_req(req, PSCOPE_OTHER);
405 req->rq_interpret_reply =
406 (ptlrpc_interpterer_t)osc_setattr_interpret;
408 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409 aa = ptlrpc_req_async_args(req);
412 ptlrpc_set_add_req(rqset, req);
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419 struct lov_stripe_md **ea, struct obd_trans_info *oti)
421 struct ptlrpc_request *req;
422 struct ost_body *body;
423 struct lov_stripe_md *lsm;
432 rc = obd_alloc_memmd(exp, &lsm);
437 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
439 GOTO(out, rc = -ENOMEM);
441 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
443 ptlrpc_request_free(req);
447 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
451 ptlrpc_request_set_replen(req);
453 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454 oa->o_flags == OBD_FL_DELORPHAN) {
456 "delorphan from OST integration");
457 /* Don't resend the delorphan req */
458 req->rq_no_resend = req->rq_no_delay = 1;
461 rc = ptlrpc_queue_wait(req);
465 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
467 GOTO(out_req, rc = -EPROTO);
471 /* This should really be sent by the OST */
472 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473 oa->o_valid |= OBD_MD_FLBLKSZ;
475 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476 * have valid lsm_oinfo data structs, so don't go touching that.
477 * This needs to be fixed in a big way.
479 lsm->lsm_object_id = oa->o_id;
480 lsm->lsm_object_gr = oa->o_gr;
484 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
486 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487 if (!oti->oti_logcookies)
488 oti_alloc_cookies(oti, 1);
489 *oti->oti_logcookies = oa->o_lcookie;
493 CDEBUG(D_HA, "transno: "LPD64"\n",
494 lustre_msg_get_transno(req->rq_repmsg));
496 ptlrpc_req_finished(req);
499 obd_free_memmd(exp, &lsm);
503 static int osc_punch_interpret(const struct lu_env *env,
504 struct ptlrpc_request *req,
505 struct osc_punch_args *aa, int rc)
507 struct ost_body *body;
513 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
515 GOTO(out, rc = -EPROTO);
517 *aa->pa_oa = body->oa;
519 rc = aa->pa_upcall(aa->pa_cookie, rc);
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524 struct obd_capa *capa,
525 obd_enqueue_update_f upcall, void *cookie,
526 struct ptlrpc_request_set *rqset)
528 struct ptlrpc_request *req;
529 struct osc_punch_args *aa;
530 struct ost_body *body;
534 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
538 osc_set_capa_size(req, &RMF_CAPA1, capa);
539 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
541 ptlrpc_request_free(req);
544 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545 ptlrpc_at_set_req_timeout(req);
547 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
550 osc_pack_capa(req, body, capa);
552 ptlrpc_request_set_replen(req);
555 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557 aa = ptlrpc_req_async_args(req);
559 aa->pa_upcall = upcall;
560 aa->pa_cookie = cookie;
561 if (rqset == PTLRPCD_SET)
562 ptlrpcd_add_req(req, PSCOPE_OTHER);
564 ptlrpc_set_add_req(rqset, req);
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570 struct obd_trans_info *oti,
571 struct ptlrpc_request_set *rqset)
573 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
574 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577 oinfo->oi_cb_up, oinfo, rqset);
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *md, obd_size start, obd_size end,
584 struct ptlrpc_request *req;
585 struct ost_body *body;
590 CDEBUG(D_INFO, "oa NULL\n");
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598 osc_set_capa_size(req, &RMF_CAPA1, capa);
599 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 ptlrpc_request_free(req);
605 /* overload the size and blocks fields in the oa with start/end */
606 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609 body->oa.o_size = start;
610 body->oa.o_blocks = end;
611 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612 osc_pack_capa(req, body, capa);
614 ptlrpc_request_set_replen(req);
616 rc = ptlrpc_queue_wait(req);
620 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
622 GOTO(out, rc = -EPROTO);
628 ptlrpc_req_finished(req);
632 /* Find and cancel locally locks matched by @mode in the resource found by
633 * @objid. Found locks are added into @cancel list. Returns the amount of
634 * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636 struct list_head *cancels, ldlm_mode_t mode,
639 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640 struct ldlm_res_id res_id;
641 struct ldlm_resource *res;
645 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
650 LDLM_RESOURCE_ADDREF(res);
651 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652 lock_flags, 0, NULL);
653 LDLM_RESOURCE_DELREF(res);
654 ldlm_resource_putref(res);
658 static int osc_destroy_interpret(const struct lu_env *env,
659 struct ptlrpc_request *req, void *data,
662 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
664 atomic_dec(&cli->cl_destroy_in_flight);
665 cfs_waitq_signal(&cli->cl_destroy_waitq);
669 static int osc_can_send_destroy(struct client_obd *cli)
671 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672 cli->cl_max_rpcs_in_flight) {
673 /* The destroy request can be sent */
676 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677 cli->cl_max_rpcs_in_flight) {
679 * The counter has been modified between the two atomic
682 cfs_waitq_signal(&cli->cl_destroy_waitq);
687 /* Destroy requests can be async always on the client, and we don't even really
688 * care about the return code since the client cannot do anything at all about
690 * When the MDS is unlinking a filename, it saves the file objects into a
691 * recovery llog, and these object records are cancelled when the OST reports
692 * they were destroyed and sync'd to disk (i.e. transaction committed).
693 * If the client dies, or the OST is down when the object should be destroyed,
694 * the records are not cancelled, and when the OST reconnects to the MDS next,
695 * it will retrieve the llog unlink logs and then sends the log cancellation
696 * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698 struct lov_stripe_md *ea, struct obd_trans_info *oti,
699 struct obd_export *md_export, void *capa)
701 struct client_obd *cli = &exp->exp_obd->u.cli;
702 struct ptlrpc_request *req;
703 struct ost_body *body;
704 CFS_LIST_HEAD(cancels);
709 CDEBUG(D_INFO, "oa NULL\n");
713 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714 LDLM_FL_DISCARD_DATA);
716 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
718 ldlm_lock_list_put(&cancels, l_bl_ast, count);
722 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
726 ptlrpc_request_free(req);
730 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731 ptlrpc_at_set_req_timeout(req);
733 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734 oa->o_lcookie = *oti->oti_logcookies;
735 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
739 osc_pack_capa(req, body, (struct obd_capa *)capa);
740 ptlrpc_request_set_replen(req);
742 /* don't throttle destroy RPCs for the MDT */
743 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744 req->rq_interpret_reply = osc_destroy_interpret;
745 if (!osc_can_send_destroy(cli)) {
746 struct l_wait_info lwi = { 0 };
749 * Wait until the number of on-going destroy RPCs drops
750 * under max_rpc_in_flight
752 l_wait_event_exclusive(cli->cl_destroy_waitq,
753 osc_can_send_destroy(cli), &lwi);
757 /* Do not wait for response */
758 ptlrpcd_add_req(req, PSCOPE_OTHER);
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
765 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767 LASSERT(!(oa->o_valid & bits));
770 client_obd_list_lock(&cli->cl_loi_list_lock);
771 oa->o_dirty = cli->cl_dirty;
772 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776 } else if (atomic_read(&obd_dirty_pages) -
777 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778 CERROR("dirty %d - %d > system dirty_max %d\n",
779 atomic_read(&obd_dirty_pages),
780 atomic_read(&obd_dirty_transit_pages),
781 obd_max_dirty_pages);
783 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784 CERROR("dirty %lu - dirty_max %lu too big???\n",
785 cli->cl_dirty, cli->cl_dirty_max);
788 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789 (cli->cl_max_rpcs_in_flight + 1);
790 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792 oa->o_grant = cli->cl_avail_grant;
793 oa->o_dropped = cli->cl_lost_grant;
794 cli->cl_lost_grant = 0;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
796 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
801 static void osc_update_next_shrink(struct client_obd *cli)
803 int time = GRANT_SHRINK_INTERVAL;
804 cli->cl_next_shrink_grant = cfs_time_shift(time);
805 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806 cli->cl_next_shrink_grant);
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811 struct brw_page *pga)
813 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814 atomic_inc(&obd_dirty_pages);
815 cli->cl_dirty += CFS_PAGE_SIZE;
816 cli->cl_avail_grant -= CFS_PAGE_SIZE;
817 pga->flag |= OBD_BRW_FROM_GRANT;
818 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819 CFS_PAGE_SIZE, pga, pga->pg);
820 LASSERT(cli->cl_avail_grant >= 0);
821 osc_update_next_shrink(cli);
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825 * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827 struct brw_page *pga, int sent)
829 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
837 pga->flag &= ~OBD_BRW_FROM_GRANT;
838 atomic_dec(&obd_dirty_pages);
839 cli->cl_dirty -= CFS_PAGE_SIZE;
840 if (pga->flag & OBD_BRW_NOCACHE) {
841 pga->flag &= ~OBD_BRW_NOCACHE;
842 atomic_dec(&obd_dirty_transit_pages);
843 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
846 cli->cl_lost_grant += CFS_PAGE_SIZE;
847 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850 /* For short writes we shouldn't count parts of pages that
851 * span a whole block on the OST side, or our accounting goes
852 * wrong. Should match the code in filter_grant_check. */
853 int offset = pga->off & ~CFS_PAGE_MASK;
854 int count = pga->count + (offset & (blocksize - 1));
855 int end = (offset + pga->count) & (blocksize - 1);
857 count += blocksize - end;
859 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862 cli->cl_avail_grant, cli->cl_dirty);
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
870 return cli->cl_r_in_flight + cli->cl_w_in_flight;
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
876 struct list_head *l, *tmp;
877 struct osc_cache_waiter *ocw;
880 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881 /* if we can't dirty more, we must wait until some is written */
882 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885 "osc max %ld, sys max %d\n", cli->cl_dirty,
886 cli->cl_dirty_max, obd_max_dirty_pages);
890 /* if still dirty cache but no grant wait for pending RPCs that
891 * may yet return us some grant before doing sync writes */
892 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894 cli->cl_w_in_flight);
898 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899 list_del_init(&ocw->ocw_entry);
900 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901 /* no more RPCs in flight to return grant, do sync IO */
902 ocw->ocw_rc = -EDQUOT;
903 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
905 osc_consume_write_grant(cli,
906 &ocw->ocw_oap->oap_brw_page);
909 cfs_waitq_signal(&ocw->ocw_waitq);
915 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
917 client_obd_list_lock(&cli->cl_loi_list_lock);
918 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
919 if (body->oa.o_valid & OBD_MD_FLGRANT)
920 cli->cl_avail_grant += body->oa.o_grant;
921 /* waiters are woken in brw_interpret */
922 client_obd_list_unlock(&cli->cl_loi_list_lock);
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926 void *key, obd_count vallen, void *val,
927 struct ptlrpc_request_set *set);
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930 struct ptlrpc_request *req,
933 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935 struct ost_body *body;
938 client_obd_list_lock(&cli->cl_loi_list_lock);
939 cli->cl_avail_grant += oa->o_grant;
940 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
946 osc_update_grant(cli, body);
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
954 client_obd_list_lock(&cli->cl_loi_list_lock);
955 oa->o_grant = cli->cl_avail_grant / 4;
956 cli->cl_avail_grant -= oa->o_grant;
957 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 oa->o_flags |= OBD_FL_SHRINK_GRANT;
959 osc_update_next_shrink(cli);
962 static int osc_shrink_grant(struct client_obd *cli)
965 struct ost_body *body;
972 osc_announce_cached(cli, &body->oa, 0);
973 osc_shrink_grant_local(cli, &body->oa);
974 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
975 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976 sizeof(*body), body, NULL);
978 client_obd_list_lock(&cli->cl_loi_list_lock);
979 cli->cl_avail_grant += body->oa.o_grant;
980 client_obd_list_unlock(&cli->cl_loi_list_lock);
987 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
988 static int osc_should_shrink_grant(struct client_obd *client)
990 cfs_time_t time = cfs_time_current();
991 cfs_time_t next_shrink = client->cl_next_shrink_grant;
992 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
997 osc_update_next_shrink(client);
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1004 struct client_obd *client;
1006 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007 if (osc_should_shrink_grant(client))
1008 osc_shrink_grant(client);
1013 static int osc_add_shrink_grant(struct client_obd *client)
1017 rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL,
1019 osc_grant_shrink_grant_cb, NULL,
1020 &client->cl_grant_shrink_list);
1022 CERROR("add grant client %s error %d\n",
1023 client->cl_import->imp_obd->obd_name, rc);
1026 CDEBUG(D_CACHE, "add grant client %s \n",
1027 client->cl_import->imp_obd->obd_name);
1028 osc_update_next_shrink(client);
1032 static int osc_del_shrink_grant(struct client_obd *client)
1034 CDEBUG(D_CACHE, "del grant client %s \n",
1035 client->cl_import->imp_obd->obd_name);
1036 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
1039 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1041 client_obd_list_lock(&cli->cl_loi_list_lock);
1042 cli->cl_avail_grant = ocd->ocd_grant;
1043 client_obd_list_unlock(&cli->cl_loi_list_lock);
1045 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1046 list_empty(&cli->cl_grant_shrink_list))
1047 osc_add_shrink_grant(cli);
1049 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1050 cli->cl_avail_grant, cli->cl_lost_grant);
1051 LASSERT(cli->cl_avail_grant >= 0);
1054 /* We assume that the reason this OSC got a short read is because it read
1055 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1056 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1057 * this stripe never got written at or beyond this stripe offset yet. */
1058 static void handle_short_read(int nob_read, obd_count page_count,
1059 struct brw_page **pga)
1064 /* skip bytes read OK */
1065 while (nob_read > 0) {
1066 LASSERT (page_count > 0);
1068 if (pga[i]->count > nob_read) {
1069 /* EOF inside this page */
1070 ptr = cfs_kmap(pga[i]->pg) +
1071 (pga[i]->off & ~CFS_PAGE_MASK);
1072 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1073 cfs_kunmap(pga[i]->pg);
1079 nob_read -= pga[i]->count;
1084 /* zero remaining pages */
1085 while (page_count-- > 0) {
1086 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1087 memset(ptr, 0, pga[i]->count);
1088 cfs_kunmap(pga[i]->pg);
1093 static int check_write_rcs(struct ptlrpc_request *req,
1094 int requested_nob, int niocount,
1095 obd_count page_count, struct brw_page **pga)
1099 /* return error if any niobuf was in error */
1100 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1101 sizeof(*remote_rcs) * niocount, NULL);
1102 if (remote_rcs == NULL) {
1103 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1106 if (lustre_msg_swabbed(req->rq_repmsg))
1107 for (i = 0; i < niocount; i++)
1108 __swab32s(&remote_rcs[i]);
1110 for (i = 0; i < niocount; i++) {
1111 if (remote_rcs[i] < 0)
1112 return(remote_rcs[i]);
1114 if (remote_rcs[i] != 0) {
1115 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1116 i, remote_rcs[i], req);
1121 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1122 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1123 req->rq_bulk->bd_nob_transferred, requested_nob);
1130 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1132 if (p1->flag != p2->flag) {
1133 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1134 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1136 /* warn if we try to combine flags that we don't know to be
1137 * safe to combine */
1138 if ((p1->flag & mask) != (p2->flag & mask))
1139 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1140 "same brw?\n", p1->flag, p2->flag);
1144 return (p1->off + p1->count == p2->off);
1147 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1148 struct brw_page **pga, int opc,
1149 cksum_type_t cksum_type)
1154 LASSERT (pg_count > 0);
1155 cksum = init_checksum(cksum_type);
1156 while (nob > 0 && pg_count > 0) {
1157 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1158 int off = pga[i]->off & ~CFS_PAGE_MASK;
1159 int count = pga[i]->count > nob ? nob : pga[i]->count;
1161 /* corrupt the data before we compute the checksum, to
1162 * simulate an OST->client data error */
1163 if (i == 0 && opc == OST_READ &&
1164 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1165 memcpy(ptr + off, "bad1", min(4, nob));
1166 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1167 cfs_kunmap(pga[i]->pg);
1168 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1171 nob -= pga[i]->count;
1175 /* For sending we only compute the wrong checksum instead
1176 * of corrupting the data so it is still correct on a redo */
1177 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1183 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1184 struct lov_stripe_md *lsm, obd_count page_count,
1185 struct brw_page **pga,
1186 struct ptlrpc_request **reqp,
1187 struct obd_capa *ocapa, int reserve)
1189 struct ptlrpc_request *req;
1190 struct ptlrpc_bulk_desc *desc;
1191 struct ost_body *body;
1192 struct obd_ioobj *ioobj;
1193 struct niobuf_remote *niobuf;
1194 int niocount, i, requested_nob, opc, rc;
1195 struct osc_brw_async_args *aa;
1196 struct req_capsule *pill;
1197 struct brw_page *pg_prev;
1200 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1201 RETURN(-ENOMEM); /* Recoverable */
1202 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1203 RETURN(-EINVAL); /* Fatal */
1205 if ((cmd & OBD_BRW_WRITE) != 0) {
1207 req = ptlrpc_request_alloc_pool(cli->cl_import,
1208 cli->cl_import->imp_rq_pool,
1212 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1217 for (niocount = i = 1; i < page_count; i++) {
1218 if (!can_merge_pages(pga[i - 1], pga[i]))
1222 pill = &req->rq_pill;
1223 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1224 niocount * sizeof(*niobuf));
1225 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1227 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1229 ptlrpc_request_free(req);
1232 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1233 ptlrpc_at_set_req_timeout(req);
1235 if (opc == OST_WRITE)
1236 desc = ptlrpc_prep_bulk_imp(req, page_count,
1237 BULK_GET_SOURCE, OST_BULK_PORTAL);
1239 desc = ptlrpc_prep_bulk_imp(req, page_count,
1240 BULK_PUT_SINK, OST_BULK_PORTAL);
1243 GOTO(out, rc = -ENOMEM);
1244 /* NB request now owns desc and will free it when it gets freed */
1246 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1247 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1248 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1249 LASSERT(body && ioobj && niobuf);
1253 obdo_to_ioobj(oa, ioobj);
1254 ioobj->ioo_bufcnt = niocount;
1255 osc_pack_capa(req, body, ocapa);
1256 LASSERT (page_count > 0);
1258 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1259 struct brw_page *pg = pga[i];
1261 LASSERT(pg->count > 0);
1262 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1263 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1264 pg->off, pg->count);
1266 LASSERTF(i == 0 || pg->off > pg_prev->off,
1267 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1268 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1270 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1271 pg_prev->pg, page_private(pg_prev->pg),
1272 pg_prev->pg->index, pg_prev->off);
1274 LASSERTF(i == 0 || pg->off > pg_prev->off,
1275 "i %d p_c %u\n", i, page_count);
1277 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1278 (pg->flag & OBD_BRW_SRVLOCK));
1280 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1282 requested_nob += pg->count;
1284 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1286 niobuf->len += pg->count;
1288 niobuf->offset = pg->off;
1289 niobuf->len = pg->count;
1290 niobuf->flags = pg->flag;
1295 LASSERTF((void *)(niobuf - niocount) ==
1296 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1297 niocount * sizeof(*niobuf)),
1298 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1299 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1300 (void *)(niobuf - niocount));
1302 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1303 if (osc_should_shrink_grant(cli))
1304 osc_shrink_grant_local(cli, &body->oa);
1306 /* size[REQ_REC_OFF] still sizeof (*body) */
1307 if (opc == OST_WRITE) {
1308 if (unlikely(cli->cl_checksum) &&
1309 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1310 /* store cl_cksum_type in a local variable since
1311 * it can be changed via lprocfs */
1312 cksum_type_t cksum_type = cli->cl_cksum_type;
1314 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1315 oa->o_flags = body->oa.o_flags = 0;
1316 body->oa.o_flags |= cksum_type_pack(cksum_type);
1317 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1318 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1322 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1324 /* save this in 'oa', too, for later checking */
1325 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1326 oa->o_flags |= cksum_type_pack(cksum_type);
1328 /* clear out the checksum flag, in case this is a
1329 * resend but cl_checksum is no longer set. b=11238 */
1330 oa->o_valid &= ~OBD_MD_FLCKSUM;
1332 oa->o_cksum = body->oa.o_cksum;
1333 /* 1 RC per niobuf */
1334 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1335 sizeof(__u32) * niocount);
1337 if (unlikely(cli->cl_checksum) &&
1338 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1339 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1340 body->oa.o_flags = 0;
1341 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1342 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1344 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1345 /* 1 RC for the whole I/O */
1347 ptlrpc_request_set_replen(req);
1349 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1350 aa = ptlrpc_req_async_args(req);
1352 aa->aa_requested_nob = requested_nob;
1353 aa->aa_nio_count = niocount;
1354 aa->aa_page_count = page_count;
1358 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1359 if (ocapa && reserve)
1360 aa->aa_ocapa = capa_get(ocapa);
1366 ptlrpc_req_finished(req);
1370 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1371 __u32 client_cksum, __u32 server_cksum, int nob,
1372 obd_count page_count, struct brw_page **pga,
1373 cksum_type_t client_cksum_type)
1377 cksum_type_t cksum_type;
1379 if (server_cksum == client_cksum) {
1380 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1384 if (oa->o_valid & OBD_MD_FLFLAGS)
1385 cksum_type = cksum_type_unpack(oa->o_flags);
1387 cksum_type = OBD_CKSUM_CRC32;
1389 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1392 if (cksum_type != client_cksum_type)
1393 msg = "the server did not use the checksum type specified in "
1394 "the original request - likely a protocol problem";
1395 else if (new_cksum == server_cksum)
1396 msg = "changed on the client after we checksummed it - "
1397 "likely false positive due to mmap IO (bug 11742)";
1398 else if (new_cksum == client_cksum)
1399 msg = "changed in transit before arrival at OST";
1401 msg = "changed in transit AND doesn't match the original - "
1402 "likely false positive due to mmap IO (bug 11742)";
1404 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1405 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1406 "["LPU64"-"LPU64"]\n",
1407 msg, libcfs_nid2str(peer->nid),
1408 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1409 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1412 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1414 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1415 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1416 "client csum now %x\n", client_cksum, client_cksum_type,
1417 server_cksum, cksum_type, new_cksum);
1421 /* Note rc enters this function as number of bytes transferred */
1422 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1424 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1425 const lnet_process_id_t *peer =
1426 &req->rq_import->imp_connection->c_peer;
1427 struct client_obd *cli = aa->aa_cli;
1428 struct ost_body *body;
1429 __u32 client_cksum = 0;
1432 if (rc < 0 && rc != -EDQUOT)
1435 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1436 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1437 lustre_swab_ost_body);
1439 CDEBUG(D_INFO, "Can't unpack body\n");
1443 /* set/clear over quota flag for a uid/gid */
1444 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1445 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1446 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1447 body->oa.o_gid, body->oa.o_valid,
1453 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1454 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1456 osc_update_grant(cli, body);
1458 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1460 CERROR("Unexpected +ve rc %d\n", rc);
1463 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1465 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1468 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1469 check_write_checksum(&body->oa, peer, client_cksum,
1470 body->oa.o_cksum, aa->aa_requested_nob,
1471 aa->aa_page_count, aa->aa_ppga,
1472 cksum_type_unpack(aa->aa_oa->o_flags)))
1475 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1476 aa->aa_page_count, aa->aa_ppga);
1480 /* The rest of this function executes only for OST_READs */
1482 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1486 if (rc > aa->aa_requested_nob) {
1487 CERROR("Unexpected rc %d (%d requested)\n", rc,
1488 aa->aa_requested_nob);
1492 if (rc != req->rq_bulk->bd_nob_transferred) {
1493 CERROR ("Unexpected rc %d (%d transferred)\n",
1494 rc, req->rq_bulk->bd_nob_transferred);
1498 if (rc < aa->aa_requested_nob)
1499 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1501 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1502 static int cksum_counter;
1503 __u32 server_cksum = body->oa.o_cksum;
1506 cksum_type_t cksum_type;
1508 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1509 cksum_type = cksum_type_unpack(body->oa.o_flags);
1511 cksum_type = OBD_CKSUM_CRC32;
1512 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1513 aa->aa_ppga, OST_READ,
1516 if (peer->nid == req->rq_bulk->bd_sender) {
1520 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1523 if (server_cksum == ~0 && rc > 0) {
1524 CERROR("Protocol error: server %s set the 'checksum' "
1525 "bit, but didn't send a checksum. Not fatal, "
1526 "but please notify on http://bugzilla.lustre.org/\n",
1527 libcfs_nid2str(peer->nid));
1528 } else if (server_cksum != client_cksum) {
1529 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1530 "%s%s%s inum "LPU64"/"LPU64" object "
1531 LPU64"/"LPU64" extent "
1532 "["LPU64"-"LPU64"]\n",
1533 req->rq_import->imp_obd->obd_name,
1534 libcfs_nid2str(peer->nid),
1536 body->oa.o_valid & OBD_MD_FLFID ?
1537 body->oa.o_fid : (__u64)0,
1538 body->oa.o_valid & OBD_MD_FLFID ?
1539 body->oa.o_generation :(__u64)0,
1541 body->oa.o_valid & OBD_MD_FLGROUP ?
1542 body->oa.o_gr : (__u64)0,
1543 aa->aa_ppga[0]->off,
1544 aa->aa_ppga[aa->aa_page_count-1]->off +
1545 aa->aa_ppga[aa->aa_page_count-1]->count -
1547 CERROR("client %x, server %x, cksum_type %x\n",
1548 client_cksum, server_cksum, cksum_type);
1550 aa->aa_oa->o_cksum = client_cksum;
1554 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1557 } else if (unlikely(client_cksum)) {
1558 static int cksum_missed;
1561 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1562 CERROR("Checksum %u requested from %s but not sent\n",
1563 cksum_missed, libcfs_nid2str(peer->nid));
1569 *aa->aa_oa = body->oa;
1574 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1575 struct lov_stripe_md *lsm,
1576 obd_count page_count, struct brw_page **pga,
1577 struct obd_capa *ocapa)
1579 struct ptlrpc_request *req;
1583 struct l_wait_info lwi;
1587 cfs_waitq_init(&waitq);
1590 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1591 page_count, pga, &req, ocapa, 0);
1595 rc = ptlrpc_queue_wait(req);
1597 if (rc == -ETIMEDOUT && req->rq_resend) {
1598 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1599 ptlrpc_req_finished(req);
1603 rc = osc_brw_fini_request(req, rc);
1605 ptlrpc_req_finished(req);
1606 if (osc_recoverable_error(rc)) {
1608 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1609 CERROR("too many resend retries, returning error\n");
1613 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1614 l_wait_event(waitq, 0, &lwi);
1622 int osc_brw_redo_request(struct ptlrpc_request *request,
1623 struct osc_brw_async_args *aa)
1625 struct ptlrpc_request *new_req;
1626 struct ptlrpc_request_set *set = request->rq_set;
1627 struct osc_brw_async_args *new_aa;
1628 struct osc_async_page *oap;
1632 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1633 CERROR("too many resend retries, returning error\n");
1637 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1639 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1640 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1641 aa->aa_cli, aa->aa_oa,
1642 NULL /* lsm unused by osc currently */,
1643 aa->aa_page_count, aa->aa_ppga,
1644 &new_req, aa->aa_ocapa, 0);
1648 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1650 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1651 if (oap->oap_request != NULL) {
1652 LASSERTF(request == oap->oap_request,
1653 "request %p != oap_request %p\n",
1654 request, oap->oap_request);
1655 if (oap->oap_interrupted) {
1656 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1657 ptlrpc_req_finished(new_req);
1662 /* New request takes over pga and oaps from old request.
1663 * Note that copying a list_head doesn't work, need to move it... */
1665 new_req->rq_interpret_reply = request->rq_interpret_reply;
1666 new_req->rq_async_args = request->rq_async_args;
1667 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1669 new_aa = ptlrpc_req_async_args(new_req);
1671 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1672 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1673 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1675 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1676 if (oap->oap_request) {
1677 ptlrpc_req_finished(oap->oap_request);
1678 oap->oap_request = ptlrpc_request_addref(new_req);
1682 new_aa->aa_ocapa = aa->aa_ocapa;
1683 aa->aa_ocapa = NULL;
1685 /* use ptlrpc_set_add_req is safe because interpret functions work
1686 * in check_set context. only one way exist with access to request
1687 * from different thread got -EINTR - this way protected with
1688 * cl_loi_list_lock */
1689 ptlrpc_set_add_req(set, new_req);
1691 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1693 DEBUG_REQ(D_INFO, new_req, "new request");
1698 * ugh, we want disk allocation on the target to happen in offset order. we'll
1699 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1700 * fine for our small page arrays and doesn't require allocation. its an
1701 * insertion sort that swaps elements that are strides apart, shrinking the
1702 * stride down until its '1' and the array is sorted.
1704 static void sort_brw_pages(struct brw_page **array, int num)
1707 struct brw_page *tmp;
1711 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1716 for (i = stride ; i < num ; i++) {
1719 while (j >= stride && array[j - stride]->off > tmp->off) {
1720 array[j] = array[j - stride];
1725 } while (stride > 1);
1728 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1734 LASSERT (pages > 0);
1735 offset = pg[i]->off & ~CFS_PAGE_MASK;
1739 if (pages == 0) /* that's all */
1742 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1743 return count; /* doesn't end on page boundary */
1746 offset = pg[i]->off & ~CFS_PAGE_MASK;
1747 if (offset != 0) /* doesn't start on page boundary */
1754 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1756 struct brw_page **ppga;
1759 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1763 for (i = 0; i < count; i++)
1768 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1770 LASSERT(ppga != NULL);
1771 OBD_FREE(ppga, sizeof(*ppga) * count);
1774 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1775 obd_count page_count, struct brw_page *pga,
1776 struct obd_trans_info *oti)
1778 struct obdo *saved_oa = NULL;
1779 struct brw_page **ppga, **orig;
1780 struct obd_import *imp = class_exp2cliimp(exp);
1781 struct client_obd *cli;
1782 int rc, page_count_orig;
1785 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1786 cli = &imp->imp_obd->u.cli;
1788 if (cmd & OBD_BRW_CHECK) {
1789 /* The caller just wants to know if there's a chance that this
1790 * I/O can succeed */
1792 if (imp->imp_invalid)
1797 /* test_brw with a failed create can trip this, maybe others. */
1798 LASSERT(cli->cl_max_pages_per_rpc);
1802 orig = ppga = osc_build_ppga(pga, page_count);
1805 page_count_orig = page_count;
1807 sort_brw_pages(ppga, page_count);
1808 while (page_count) {
1809 obd_count pages_per_brw;
1811 if (page_count > cli->cl_max_pages_per_rpc)
1812 pages_per_brw = cli->cl_max_pages_per_rpc;
1814 pages_per_brw = page_count;
1816 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1818 if (saved_oa != NULL) {
1819 /* restore previously saved oa */
1820 *oinfo->oi_oa = *saved_oa;
1821 } else if (page_count > pages_per_brw) {
1822 /* save a copy of oa (brw will clobber it) */
1823 OBDO_ALLOC(saved_oa);
1824 if (saved_oa == NULL)
1825 GOTO(out, rc = -ENOMEM);
1826 *saved_oa = *oinfo->oi_oa;
1829 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1830 pages_per_brw, ppga, oinfo->oi_capa);
1835 page_count -= pages_per_brw;
1836 ppga += pages_per_brw;
1840 osc_release_ppga(orig, page_count_orig);
1842 if (saved_oa != NULL)
1843 OBDO_FREE(saved_oa);
1848 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1849 * the dirty accounting. Writeback completes or truncate happens before
1850 * writing starts. Must be called with the loi lock held. */
1851 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1854 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1858 /* This maintains the lists of pending pages to read/write for a given object
1859 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1860 * to quickly find objects that are ready to send an RPC. */
1861 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1867 if (lop->lop_num_pending == 0)
1870 /* if we have an invalid import we want to drain the queued pages
1871 * by forcing them through rpcs that immediately fail and complete
1872 * the pages. recovery relies on this to empty the queued pages
1873 * before canceling the locks and evicting down the llite pages */
1874 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1877 /* stream rpcs in queue order as long as as there is an urgent page
1878 * queued. this is our cheap solution for good batching in the case
1879 * where writepage marks some random page in the middle of the file
1880 * as urgent because of, say, memory pressure */
1881 if (!list_empty(&lop->lop_urgent)) {
1882 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1885 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1886 optimal = cli->cl_max_pages_per_rpc;
1887 if (cmd & OBD_BRW_WRITE) {
1888 /* trigger a write rpc stream as long as there are dirtiers
1889 * waiting for space. as they're waiting, they're not going to
1890 * create more pages to coallesce with what's waiting.. */
1891 if (!list_empty(&cli->cl_cache_waiters)) {
1892 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1895 /* +16 to avoid triggering rpcs that would want to include pages
1896 * that are being queued but which can't be made ready until
1897 * the queuer finishes with the page. this is a wart for
1898 * llite::commit_write() */
1901 if (lop->lop_num_pending >= optimal)
1907 static void on_list(struct list_head *item, struct list_head *list,
1910 if (list_empty(item) && should_be_on)
1911 list_add_tail(item, list);
1912 else if (!list_empty(item) && !should_be_on)
1913 list_del_init(item);
1916 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1917 * can find pages to build into rpcs quickly */
1918 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1920 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1921 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1922 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1924 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1925 loi->loi_write_lop.lop_num_pending);
1927 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1928 loi->loi_read_lop.lop_num_pending);
1931 static void lop_update_pending(struct client_obd *cli,
1932 struct loi_oap_pages *lop, int cmd, int delta)
1934 lop->lop_num_pending += delta;
1935 if (cmd & OBD_BRW_WRITE)
1936 cli->cl_pending_w_pages += delta;
1938 cli->cl_pending_r_pages += delta;
1942 * this is called when a sync waiter receives an interruption. Its job is to
1943 * get the caller woken as soon as possible. If its page hasn't been put in an
1944 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1945 * desiring interruption which will forcefully complete the rpc once the rpc
1948 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1950 struct loi_oap_pages *lop;
1951 struct lov_oinfo *loi;
1955 LASSERT(!oap->oap_interrupted);
1956 oap->oap_interrupted = 1;
1958 /* ok, it's been put in an rpc. only one oap gets a request reference */
1959 if (oap->oap_request != NULL) {
1960 ptlrpc_mark_interrupted(oap->oap_request);
1961 ptlrpcd_wake(oap->oap_request);
1962 ptlrpc_req_finished(oap->oap_request);
1963 oap->oap_request = NULL;
1967 * page completion may be called only if ->cpo_prep() method was
1968 * executed by osc_io_submit(), that also adds page the to pending list
1970 if (!list_empty(&oap->oap_pending_item)) {
1971 list_del_init(&oap->oap_pending_item);
1972 list_del_init(&oap->oap_urgent_item);
1975 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1976 &loi->loi_write_lop : &loi->loi_read_lop;
1977 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1978 loi_list_maint(oap->oap_cli, oap->oap_loi);
1979 rc = oap->oap_caller_ops->ap_completion(env,
1980 oap->oap_caller_data,
1981 oap->oap_cmd, NULL, -EINTR);
1987 /* this is trying to propogate async writeback errors back up to the
1988 * application. As an async write fails we record the error code for later if
1989 * the app does an fsync. As long as errors persist we force future rpcs to be
1990 * sync so that the app can get a sync error and break the cycle of queueing
1991 * pages for which writeback will fail. */
1992 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1999 ar->ar_force_sync = 1;
2000 ar->ar_min_xid = ptlrpc_sample_next_xid();
2005 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2006 ar->ar_force_sync = 0;
2009 void osc_oap_to_pending(struct osc_async_page *oap)
2011 struct loi_oap_pages *lop;
2013 if (oap->oap_cmd & OBD_BRW_WRITE)
2014 lop = &oap->oap_loi->loi_write_lop;
2016 lop = &oap->oap_loi->loi_read_lop;
2018 if (oap->oap_async_flags & ASYNC_URGENT)
2019 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2020 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2021 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2024 /* this must be called holding the loi list lock to give coverage to exit_cache,
2025 * async_flag maintenance, and oap_request */
2026 static void osc_ap_completion(const struct lu_env *env,
2027 struct client_obd *cli, struct obdo *oa,
2028 struct osc_async_page *oap, int sent, int rc)
2033 if (oap->oap_request != NULL) {
2034 xid = ptlrpc_req_xid(oap->oap_request);
2035 ptlrpc_req_finished(oap->oap_request);
2036 oap->oap_request = NULL;
2039 oap->oap_async_flags = 0;
2040 oap->oap_interrupted = 0;
2042 if (oap->oap_cmd & OBD_BRW_WRITE) {
2043 osc_process_ar(&cli->cl_ar, xid, rc);
2044 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2047 if (rc == 0 && oa != NULL) {
2048 if (oa->o_valid & OBD_MD_FLBLOCKS)
2049 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2050 if (oa->o_valid & OBD_MD_FLMTIME)
2051 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2052 if (oa->o_valid & OBD_MD_FLATIME)
2053 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2054 if (oa->o_valid & OBD_MD_FLCTIME)
2055 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2058 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2059 oap->oap_cmd, oa, rc);
2061 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2062 * I/O on the page could start, but OSC calls it under lock
2063 * and thus we can add oap back to pending safely */
2065 /* upper layer wants to leave the page on pending queue */
2066 osc_oap_to_pending(oap);
2068 osc_exit_cache(cli, oap, sent);
2072 static int brw_interpret(const struct lu_env *env,
2073 struct ptlrpc_request *req, void *data, int rc)
2075 struct osc_brw_async_args *aa = data;
2076 struct client_obd *cli;
2080 rc = osc_brw_fini_request(req, rc);
2081 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2082 if (osc_recoverable_error(rc)) {
2083 rc = osc_brw_redo_request(req, aa);
2089 capa_put(aa->aa_ocapa);
2090 aa->aa_ocapa = NULL;
2095 client_obd_list_lock(&cli->cl_loi_list_lock);
2097 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2098 * is called so we know whether to go to sync BRWs or wait for more
2099 * RPCs to complete */
2100 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2101 cli->cl_w_in_flight--;
2103 cli->cl_r_in_flight--;
2105 async = list_empty(&aa->aa_oaps);
2106 if (!async) { /* from osc_send_oap_rpc() */
2107 struct osc_async_page *oap, *tmp;
2108 /* the caller may re-use the oap after the completion call so
2109 * we need to clean it up a little */
2110 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2111 list_del_init(&oap->oap_rpc_item);
2112 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2114 OBDO_FREE(aa->aa_oa);
2115 } else { /* from async_internal() */
2117 for (i = 0; i < aa->aa_page_count; i++)
2118 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2120 osc_wake_cache_waiters(cli);
2121 osc_check_rpcs(env, cli);
2122 client_obd_list_unlock(&cli->cl_loi_list_lock);
2124 cl_req_completion(env, aa->aa_clerq, rc);
2125 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2129 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2130 struct client_obd *cli,
2131 struct list_head *rpc_list,
2132 int page_count, int cmd)
2134 struct ptlrpc_request *req;
2135 struct brw_page **pga = NULL;
2136 struct osc_brw_async_args *aa;
2137 struct obdo *oa = NULL;
2138 const struct obd_async_page_ops *ops = NULL;
2139 void *caller_data = NULL;
2140 struct osc_async_page *oap;
2141 struct osc_async_page *tmp;
2142 struct ost_body *body;
2143 struct cl_req *clerq = NULL;
2144 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2145 struct ldlm_lock *lock = NULL;
2146 struct cl_req_attr crattr;
2150 LASSERT(!list_empty(rpc_list));
2152 memset(&crattr, 0, sizeof crattr);
2153 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2155 GOTO(out, req = ERR_PTR(-ENOMEM));
2159 GOTO(out, req = ERR_PTR(-ENOMEM));
2162 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2163 struct cl_page *page = osc_oap2cl_page(oap);
2165 ops = oap->oap_caller_ops;
2166 caller_data = oap->oap_caller_data;
2168 clerq = cl_req_alloc(env, page, crt,
2169 1 /* only 1-object rpcs for
2172 GOTO(out, req = (void *)clerq);
2173 lock = oap->oap_ldlm_lock;
2175 pga[i] = &oap->oap_brw_page;
2176 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2177 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2178 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2180 cl_req_page_add(env, clerq, page);
2183 /* always get the data for the obdo for the rpc */
2184 LASSERT(ops != NULL);
2186 crattr.cra_capa = NULL;
2187 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2189 oa->o_handle = lock->l_remote_handle;
2190 oa->o_valid |= OBD_MD_FLHANDLE;
2193 rc = cl_req_prep(env, clerq);
2195 CERROR("cl_req_prep failed: %d\n", rc);
2196 GOTO(out, req = ERR_PTR(rc));
2199 sort_brw_pages(pga, page_count);
2200 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2201 pga, &req, crattr.cra_capa, 1);
2203 CERROR("prep_req failed: %d\n", rc);
2204 GOTO(out, req = ERR_PTR(rc));
2207 /* Need to update the timestamps after the request is built in case
2208 * we race with setattr (locally or in queue at OST). If OST gets
2209 * later setattr before earlier BRW (as determined by the request xid),
2210 * the OST will not use BRW timestamps. Sadly, there is no obvious
2211 * way to do this in a single call. bug 10150 */
2212 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2213 cl_req_attr_set(env, clerq, &crattr,
2214 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2216 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2217 aa = ptlrpc_req_async_args(req);
2218 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2219 list_splice(rpc_list, &aa->aa_oaps);
2220 CFS_INIT_LIST_HEAD(rpc_list);
2221 aa->aa_clerq = clerq;
2223 capa_put(crattr.cra_capa);
2228 OBD_FREE(pga, sizeof(*pga) * page_count);
2229 /* this should happen rarely and is pretty bad, it makes the
2230 * pending list not follow the dirty order */
2231 client_obd_list_lock(&cli->cl_loi_list_lock);
2232 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2233 list_del_init(&oap->oap_rpc_item);
2235 /* queued sync pages can be torn down while the pages
2236 * were between the pending list and the rpc */
2237 if (oap->oap_interrupted) {
2238 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2239 osc_ap_completion(env, cli, NULL, oap, 0,
2243 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2245 if (clerq && !IS_ERR(clerq))
2246 cl_req_completion(env, clerq, PTR_ERR(req));
2252 * prepare pages for ASYNC io and put pages in send queue.
2256 * \param cmd - OBD_BRW_* macroses
2257 * \param lop - pending pages
2259 * \return zero if pages successfully add to send queue.
2260 * \return not zere if error occurring.
2263 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2264 struct lov_oinfo *loi,
2265 int cmd, struct loi_oap_pages *lop)
2267 struct ptlrpc_request *req;
2268 obd_count page_count = 0;
2269 struct osc_async_page *oap = NULL, *tmp;
2270 struct osc_brw_async_args *aa;
2271 const struct obd_async_page_ops *ops;
2272 CFS_LIST_HEAD(rpc_list);
2273 unsigned int ending_offset;
2274 unsigned starting_offset = 0;
2276 struct cl_object *clob = NULL;
2279 /* first we find the pages we're allowed to work with */
2280 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2282 ops = oap->oap_caller_ops;
2284 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2285 "magic 0x%x\n", oap, oap->oap_magic);
2288 /* pin object in memory, so that completion call-backs
2289 * can be safely called under client_obd_list lock. */
2290 clob = osc_oap2cl_page(oap)->cp_obj;
2291 cl_object_get(clob);
2294 if (page_count != 0 &&
2295 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2296 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2297 " oap %p, page %p, srvlock %u\n",
2298 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2301 /* in llite being 'ready' equates to the page being locked
2302 * until completion unlocks it. commit_write submits a page
2303 * as not ready because its unlock will happen unconditionally
2304 * as the call returns. if we race with commit_write giving
2305 * us that page we dont' want to create a hole in the page
2306 * stream, so we stop and leave the rpc to be fired by
2307 * another dirtier or kupdated interval (the not ready page
2308 * will still be on the dirty list). we could call in
2309 * at the end of ll_file_write to process the queue again. */
2310 if (!(oap->oap_async_flags & ASYNC_READY)) {
2311 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2314 CDEBUG(D_INODE, "oap %p page %p returned %d "
2315 "instead of ready\n", oap,
2319 /* llite is telling us that the page is still
2320 * in commit_write and that we should try
2321 * and put it in an rpc again later. we
2322 * break out of the loop so we don't create
2323 * a hole in the sequence of pages in the rpc
2328 /* the io isn't needed.. tell the checks
2329 * below to complete the rpc with EINTR */
2330 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2331 oap->oap_count = -EINTR;
2334 oap->oap_async_flags |= ASYNC_READY;
2337 LASSERTF(0, "oap %p page %p returned %d "
2338 "from make_ready\n", oap,
2346 * Page submitted for IO has to be locked. Either by
2347 * ->ap_make_ready() or by higher layers.
2349 #if defined(__KERNEL__) && defined(__linux__)
2351 struct cl_page *page;
2353 page = osc_oap2cl_page(oap);
2355 if (page->cp_type == CPT_CACHEABLE &&
2356 !(PageLocked(oap->oap_page) &&
2357 (CheckWriteback(oap->oap_page, cmd)))) {
2358 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2360 (long)oap->oap_page->flags,
2361 oap->oap_async_flags);
2366 /* If there is a gap at the start of this page, it can't merge
2367 * with any previous page, so we'll hand the network a
2368 * "fragmented" page array that it can't transfer in 1 RDMA */
2369 if (page_count != 0 && oap->oap_page_off != 0)
2372 /* take the page out of our book-keeping */
2373 list_del_init(&oap->oap_pending_item);
2374 lop_update_pending(cli, lop, cmd, -1);
2375 list_del_init(&oap->oap_urgent_item);
2377 if (page_count == 0)
2378 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2379 (PTLRPC_MAX_BRW_SIZE - 1);
2381 /* ask the caller for the size of the io as the rpc leaves. */
2382 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2384 ops->ap_refresh_count(env, oap->oap_caller_data,
2386 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2388 if (oap->oap_count <= 0) {
2389 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2391 osc_ap_completion(env, cli, NULL,
2392 oap, 0, oap->oap_count);
2396 /* now put the page back in our accounting */
2397 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2398 if (page_count == 0)
2399 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2400 if (++page_count >= cli->cl_max_pages_per_rpc)
2403 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2404 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2405 * have the same alignment as the initial writes that allocated
2406 * extents on the server. */
2407 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2408 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2409 if (ending_offset == 0)
2412 /* If there is a gap at the end of this page, it can't merge
2413 * with any subsequent pages, so we'll hand the network a
2414 * "fragmented" page array that it can't transfer in 1 RDMA */
2415 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2419 osc_wake_cache_waiters(cli);
2421 loi_list_maint(cli, loi);
2423 client_obd_list_unlock(&cli->cl_loi_list_lock);
2426 cl_object_put(env, clob);
2428 if (page_count == 0) {
2429 client_obd_list_lock(&cli->cl_loi_list_lock);
2433 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2435 LASSERT(list_empty(&rpc_list));
2436 loi_list_maint(cli, loi);
2437 RETURN(PTR_ERR(req));
2440 aa = ptlrpc_req_async_args(req);
2442 if (cmd == OBD_BRW_READ) {
2443 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2444 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2445 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2446 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2448 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2449 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2450 cli->cl_w_in_flight);
2451 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2452 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2454 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2456 client_obd_list_lock(&cli->cl_loi_list_lock);
2458 if (cmd == OBD_BRW_READ)
2459 cli->cl_r_in_flight++;
2461 cli->cl_w_in_flight++;
2463 /* queued sync pages can be torn down while the pages
2464 * were between the pending list and the rpc */
2466 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2467 /* only one oap gets a request reference */
2470 if (oap->oap_interrupted && !req->rq_intr) {
2471 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2473 ptlrpc_mark_interrupted(req);
2477 tmp->oap_request = ptlrpc_request_addref(req);
2479 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2480 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2482 req->rq_interpret_reply = brw_interpret;
2483 ptlrpcd_add_req(req, PSCOPE_BRW);
2487 #define LOI_DEBUG(LOI, STR, args...) \
2488 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2489 !list_empty(&(LOI)->loi_cli_item), \
2490 (LOI)->loi_write_lop.lop_num_pending, \
2491 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2492 (LOI)->loi_read_lop.lop_num_pending, \
2493 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2496 /* This is called by osc_check_rpcs() to find which objects have pages that
2497 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2498 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2501 /* first return all objects which we already know to have
2502 * pages ready to be stuffed into rpcs */
2503 if (!list_empty(&cli->cl_loi_ready_list))
2504 RETURN(list_entry(cli->cl_loi_ready_list.next,
2505 struct lov_oinfo, loi_cli_item));
2507 /* then if we have cache waiters, return all objects with queued
2508 * writes. This is especially important when many small files
2509 * have filled up the cache and not been fired into rpcs because
2510 * they don't pass the nr_pending/object threshhold */
2511 if (!list_empty(&cli->cl_cache_waiters) &&
2512 !list_empty(&cli->cl_loi_write_list))
2513 RETURN(list_entry(cli->cl_loi_write_list.next,
2514 struct lov_oinfo, loi_write_item));
2516 /* then return all queued objects when we have an invalid import
2517 * so that they get flushed */
2518 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2519 if (!list_empty(&cli->cl_loi_write_list))
2520 RETURN(list_entry(cli->cl_loi_write_list.next,
2521 struct lov_oinfo, loi_write_item));
2522 if (!list_empty(&cli->cl_loi_read_list))
2523 RETURN(list_entry(cli->cl_loi_read_list.next,
2524 struct lov_oinfo, loi_read_item));
2529 /* called with the loi list lock held */
2530 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2532 struct lov_oinfo *loi;
2533 int rc = 0, race_counter = 0;
2536 while ((loi = osc_next_loi(cli)) != NULL) {
2537 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2539 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2542 /* attempt some read/write balancing by alternating between
2543 * reads and writes in an object. The makes_rpc checks here
2544 * would be redundant if we were getting read/write work items
2545 * instead of objects. we don't want send_oap_rpc to drain a
2546 * partial read pending queue when we're given this object to
2547 * do io on writes while there are cache waiters */
2548 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2549 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2550 &loi->loi_write_lop);
2558 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2559 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2560 &loi->loi_read_lop);
2569 /* attempt some inter-object balancing by issueing rpcs
2570 * for each object in turn */
2571 if (!list_empty(&loi->loi_cli_item))
2572 list_del_init(&loi->loi_cli_item);
2573 if (!list_empty(&loi->loi_write_item))
2574 list_del_init(&loi->loi_write_item);
2575 if (!list_empty(&loi->loi_read_item))
2576 list_del_init(&loi->loi_read_item);
2578 loi_list_maint(cli, loi);
2580 /* send_oap_rpc fails with 0 when make_ready tells it to
2581 * back off. llite's make_ready does this when it tries
2582 * to lock a page queued for write that is already locked.
2583 * we want to try sending rpcs from many objects, but we
2584 * don't want to spin failing with 0. */
2585 if (race_counter == 10)
2591 /* we're trying to queue a page in the osc so we're subject to the
2592 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2593 * If the osc's queued pages are already at that limit, then we want to sleep
2594 * until there is space in the osc's queue for us. We also may be waiting for
2595 * write credits from the OST if there are RPCs in flight that may return some
2596 * before we fall back to sync writes.
2598 * We need this know our allocation was granted in the presence of signals */
2599 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2603 client_obd_list_lock(&cli->cl_loi_list_lock);
2604 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2605 client_obd_list_unlock(&cli->cl_loi_list_lock);
2610 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2613 int osc_enter_cache_try(const struct lu_env *env,
2614 struct client_obd *cli, struct lov_oinfo *loi,
2615 struct osc_async_page *oap, int transient)
2619 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2621 osc_consume_write_grant(cli, &oap->oap_brw_page);
2623 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2624 atomic_inc(&obd_dirty_transit_pages);
2625 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2631 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2632 * grant or cache space. */
2633 static int osc_enter_cache(const struct lu_env *env,
2634 struct client_obd *cli, struct lov_oinfo *loi,
2635 struct osc_async_page *oap)
2637 struct osc_cache_waiter ocw;
2638 struct l_wait_info lwi = { 0 };
2642 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2643 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2644 cli->cl_dirty_max, obd_max_dirty_pages,
2645 cli->cl_lost_grant, cli->cl_avail_grant);
2647 /* force the caller to try sync io. this can jump the list
2648 * of queued writes and create a discontiguous rpc stream */
2649 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2650 loi->loi_ar.ar_force_sync)
2653 /* Hopefully normal case - cache space and write credits available */
2654 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2655 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2656 osc_enter_cache_try(env, cli, loi, oap, 0))
2659 /* Make sure that there are write rpcs in flight to wait for. This
2660 * is a little silly as this object may not have any pending but
2661 * other objects sure might. */
2662 if (cli->cl_w_in_flight) {
2663 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2664 cfs_waitq_init(&ocw.ocw_waitq);
2668 loi_list_maint(cli, loi);
2669 osc_check_rpcs(env, cli);
2670 client_obd_list_unlock(&cli->cl_loi_list_lock);
2672 CDEBUG(D_CACHE, "sleeping for cache space\n");
2673 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2675 client_obd_list_lock(&cli->cl_loi_list_lock);
2676 if (!list_empty(&ocw.ocw_entry)) {
2677 list_del(&ocw.ocw_entry);
2687 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2688 struct lov_oinfo *loi, cfs_page_t *page,
2689 obd_off offset, const struct obd_async_page_ops *ops,
2690 void *data, void **res, int nocache,
2691 struct lustre_handle *lockh)
2693 struct osc_async_page *oap;
2698 return size_round(sizeof(*oap));
2701 oap->oap_magic = OAP_MAGIC;
2702 oap->oap_cli = &exp->exp_obd->u.cli;
2705 oap->oap_caller_ops = ops;
2706 oap->oap_caller_data = data;
2708 oap->oap_page = page;
2709 oap->oap_obj_off = offset;
2710 if (!client_is_remote(exp) &&
2711 cfs_capable(CFS_CAP_SYS_RESOURCE))
2712 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2714 LASSERT(!(offset & ~CFS_PAGE_MASK));
2716 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2717 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2718 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2719 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2721 spin_lock_init(&oap->oap_lock);
2722 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2726 struct osc_async_page *oap_from_cookie(void *cookie)
2728 struct osc_async_page *oap = cookie;
2729 if (oap->oap_magic != OAP_MAGIC)
2730 return ERR_PTR(-EINVAL);
2734 int osc_queue_async_io(const struct lu_env *env,
2735 struct obd_export *exp, struct lov_stripe_md *lsm,
2736 struct lov_oinfo *loi, void *cookie,
2737 int cmd, obd_off off, int count,
2738 obd_flag brw_flags, enum async_flags async_flags)
2740 struct client_obd *cli = &exp->exp_obd->u.cli;
2741 struct osc_async_page *oap;
2745 oap = oap_from_cookie(cookie);
2747 RETURN(PTR_ERR(oap));
2749 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2752 if (!list_empty(&oap->oap_pending_item) ||
2753 !list_empty(&oap->oap_urgent_item) ||
2754 !list_empty(&oap->oap_rpc_item))
2757 /* check if the file's owner/group is over quota */
2758 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2759 struct cl_object *obj;
2760 struct cl_attr attr; /* XXX put attr into thread info */
2762 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2764 cl_object_attr_lock(obj);
2765 rc = cl_object_attr_get(env, obj, &attr);
2766 cl_object_attr_unlock(obj);
2768 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2769 attr.cat_gid) == NO_QUOTA)
2776 loi = lsm->lsm_oinfo[0];
2778 client_obd_list_lock(&cli->cl_loi_list_lock);
2780 LASSERT(off + count <= CFS_PAGE_SIZE);
2782 oap->oap_page_off = off;
2783 oap->oap_count = count;
2784 oap->oap_brw_flags = brw_flags;
2785 oap->oap_async_flags = async_flags;
2787 if (cmd & OBD_BRW_WRITE) {
2788 rc = osc_enter_cache(env, cli, loi, oap);
2790 client_obd_list_unlock(&cli->cl_loi_list_lock);
2795 osc_oap_to_pending(oap);
2796 loi_list_maint(cli, loi);
2798 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2801 osc_check_rpcs(env, cli);
2802 client_obd_list_unlock(&cli->cl_loi_list_lock);
2807 /* aka (~was & now & flag), but this is more clear :) */
2808 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2810 int osc_set_async_flags_base(struct client_obd *cli,
2811 struct lov_oinfo *loi, struct osc_async_page *oap,
2812 obd_flag async_flags)
2814 struct loi_oap_pages *lop;
2817 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2820 if (oap->oap_cmd & OBD_BRW_WRITE) {
2821 lop = &loi->loi_write_lop;
2823 lop = &loi->loi_read_lop;
2826 if (list_empty(&oap->oap_pending_item))
2829 if ((oap->oap_async_flags & async_flags) == async_flags)
2832 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2833 oap->oap_async_flags |= ASYNC_READY;
2835 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2836 if (list_empty(&oap->oap_rpc_item)) {
2837 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2838 loi_list_maint(cli, loi);
2842 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2843 oap->oap_async_flags);
2847 int osc_teardown_async_page(struct obd_export *exp,
2848 struct lov_stripe_md *lsm,
2849 struct lov_oinfo *loi, void *cookie)
2851 struct client_obd *cli = &exp->exp_obd->u.cli;
2852 struct loi_oap_pages *lop;
2853 struct osc_async_page *oap;
2857 oap = oap_from_cookie(cookie);
2859 RETURN(PTR_ERR(oap));
2862 loi = lsm->lsm_oinfo[0];
2864 if (oap->oap_cmd & OBD_BRW_WRITE) {
2865 lop = &loi->loi_write_lop;
2867 lop = &loi->loi_read_lop;
2870 client_obd_list_lock(&cli->cl_loi_list_lock);
2872 if (!list_empty(&oap->oap_rpc_item))
2873 GOTO(out, rc = -EBUSY);
2875 osc_exit_cache(cli, oap, 0);
2876 osc_wake_cache_waiters(cli);
2878 if (!list_empty(&oap->oap_urgent_item)) {
2879 list_del_init(&oap->oap_urgent_item);
2880 oap->oap_async_flags &= ~ASYNC_URGENT;
2882 if (!list_empty(&oap->oap_pending_item)) {
2883 list_del_init(&oap->oap_pending_item);
2884 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2886 loi_list_maint(cli, loi);
2887 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2889 client_obd_list_unlock(&cli->cl_loi_list_lock);
2893 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2894 struct ldlm_enqueue_info *einfo,
2897 void *data = einfo->ei_cbdata;
2899 LASSERT(lock != NULL);
2900 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2901 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2902 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2903 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2905 lock_res_and_lock(lock);
2906 spin_lock(&osc_ast_guard);
2907 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2908 lock->l_ast_data = data;
2909 spin_unlock(&osc_ast_guard);
2910 unlock_res_and_lock(lock);
2913 static void osc_set_data_with_check(struct lustre_handle *lockh,
2914 struct ldlm_enqueue_info *einfo,
2917 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2920 osc_set_lock_data_with_check(lock, einfo, flags);
2921 LDLM_LOCK_PUT(lock);
2923 CERROR("lockh %p, data %p - client evicted?\n",
2924 lockh, einfo->ei_cbdata);
2927 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2928 ldlm_iterator_t replace, void *data)
2930 struct ldlm_res_id res_id;
2931 struct obd_device *obd = class_exp2obd(exp);
2933 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2934 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2938 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2939 obd_enqueue_update_f upcall, void *cookie,
2942 int intent = *flags & LDLM_FL_HAS_INTENT;
2946 /* The request was created before ldlm_cli_enqueue call. */
2947 if (rc == ELDLM_LOCK_ABORTED) {
2948 struct ldlm_reply *rep;
2949 rep = req_capsule_server_get(&req->rq_pill,
2952 LASSERT(rep != NULL);
2953 if (rep->lock_policy_res1)
2954 rc = rep->lock_policy_res1;
2958 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2959 *flags |= LDLM_FL_LVB_READY;
2960 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2961 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2964 /* Call the update callback. */
2965 rc = (*upcall)(cookie, rc);
2969 static int osc_enqueue_interpret(const struct lu_env *env,
2970 struct ptlrpc_request *req,
2971 struct osc_enqueue_args *aa, int rc)
2973 struct ldlm_lock *lock;
2974 struct lustre_handle handle;
2977 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2978 * might be freed anytime after lock upcall has been called. */
2979 lustre_handle_copy(&handle, aa->oa_lockh);
2980 mode = aa->oa_ei->ei_mode;
2982 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2984 lock = ldlm_handle2lock(&handle);
2986 /* Take an additional reference so that a blocking AST that
2987 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2988 * to arrive after an upcall has been executed by
2989 * osc_enqueue_fini(). */
2990 ldlm_lock_addref(&handle, mode);
2992 /* Complete obtaining the lock procedure. */
2993 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2994 mode, aa->oa_flags, aa->oa_lvb,
2995 sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2997 /* Complete osc stuff. */
2998 rc = osc_enqueue_fini(req, aa->oa_lvb,
2999 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3000 /* Release the lock for async request. */
3001 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3003 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3004 * not already released by
3005 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3007 ldlm_lock_decref(&handle, mode);
3009 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3010 aa->oa_lockh, req, aa);
3011 ldlm_lock_decref(&handle, mode);
3012 LDLM_LOCK_PUT(lock);
3016 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3017 struct lov_oinfo *loi, int flags,
3018 struct ost_lvb *lvb, __u32 mode, int rc)
3020 if (rc == ELDLM_OK) {
3021 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3024 LASSERT(lock != NULL);
3025 loi->loi_lvb = *lvb;
3026 tmp = loi->loi_lvb.lvb_size;
3027 /* Extend KMS up to the end of this lock and no further
3028 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3029 if (tmp > lock->l_policy_data.l_extent.end)
3030 tmp = lock->l_policy_data.l_extent.end + 1;
3031 if (tmp >= loi->loi_kms) {
3032 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3033 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3034 loi_kms_set(loi, tmp);
3036 LDLM_DEBUG(lock, "lock acquired, setting rss="
3037 LPU64"; leaving kms="LPU64", end="LPU64,
3038 loi->loi_lvb.lvb_size, loi->loi_kms,
3039 lock->l_policy_data.l_extent.end);
3041 ldlm_lock_allow_match(lock);
3042 LDLM_LOCK_PUT(lock);
3043 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3044 loi->loi_lvb = *lvb;
3045 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3046 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3050 EXPORT_SYMBOL(osc_update_enqueue);
3052 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3054 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3055 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3056 * other synchronous requests, however keeping some locks and trying to obtain
3057 * others may take a considerable amount of time in a case of ost failure; and
3058 * when other sync requests do not get released lock from a client, the client
3059 * is excluded from the cluster -- such scenarious make the life difficult, so
3060 * release locks just after they are obtained. */
3061 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3062 int *flags, ldlm_policy_data_t *policy,
3063 struct ost_lvb *lvb, int kms_valid,
3064 obd_enqueue_update_f upcall, void *cookie,
3065 struct ldlm_enqueue_info *einfo,
3066 struct lustre_handle *lockh,
3067 struct ptlrpc_request_set *rqset, int async)
3069 struct obd_device *obd = exp->exp_obd;
3070 struct ptlrpc_request *req = NULL;
3071 int intent = *flags & LDLM_FL_HAS_INTENT;
3076 /* Filesystem lock extents are extended to page boundaries so that
3077 * dealing with the page cache is a little smoother. */
3078 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3079 policy->l_extent.end |= ~CFS_PAGE_MASK;
3082 * kms is not valid when either object is completely fresh (so that no
3083 * locks are cached), or object was evicted. In the latter case cached
3084 * lock cannot be used, because it would prime inode state with
3085 * potentially stale LVB.
3090 /* Next, search for already existing extent locks that will cover us */
3091 /* If we're trying to read, we also search for an existing PW lock. The
3092 * VFS and page cache already protect us locally, so lots of readers/
3093 * writers can share a single PW lock.
3095 * There are problems with conversion deadlocks, so instead of
3096 * converting a read lock to a write lock, we'll just enqueue a new
3099 * At some point we should cancel the read lock instead of making them
3100 * send us a blocking callback, but there are problems with canceling
3101 * locks out from other users right now, too. */
3102 mode = einfo->ei_mode;
3103 if (einfo->ei_mode == LCK_PR)
3105 mode = ldlm_lock_match(obd->obd_namespace,
3106 *flags | LDLM_FL_LVB_READY, res_id,
3107 einfo->ei_type, policy, mode, lockh, 0);
3109 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3111 if (matched->l_ast_data == NULL ||
3112 matched->l_ast_data == einfo->ei_cbdata) {
3113 /* addref the lock only if not async requests and PW
3114 * lock is matched whereas we asked for PR. */
3115 if (!rqset && einfo->ei_mode != mode)
3116 ldlm_lock_addref(lockh, LCK_PR);
3117 osc_set_lock_data_with_check(matched, einfo, *flags);
3119 /* I would like to be able to ASSERT here that
3120 * rss <= kms, but I can't, for reasons which
3121 * are explained in lov_enqueue() */
3124 /* We already have a lock, and it's referenced */
3125 (*upcall)(cookie, ELDLM_OK);
3127 /* For async requests, decref the lock. */
3128 if (einfo->ei_mode != mode)
3129 ldlm_lock_decref(lockh, LCK_PW);
3131 ldlm_lock_decref(lockh, einfo->ei_mode);
3132 LDLM_LOCK_PUT(matched);
3135 ldlm_lock_decref(lockh, mode);
3136 LDLM_LOCK_PUT(matched);
3141 CFS_LIST_HEAD(cancels);
3142 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3143 &RQF_LDLM_ENQUEUE_LVB);
3147 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3151 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3153 ptlrpc_request_set_replen(req);
3156 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3157 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3159 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3160 sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3163 struct osc_enqueue_args *aa;
3164 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3165 aa = ptlrpc_req_async_args(req);
3168 aa->oa_flags = flags;
3169 aa->oa_upcall = upcall;
3170 aa->oa_cookie = cookie;
3172 aa->oa_lockh = lockh;
3174 req->rq_interpret_reply =
3175 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3176 if (rqset == PTLRPCD_SET)
3177 ptlrpcd_add_req(req, PSCOPE_OTHER);
3179 ptlrpc_set_add_req(rqset, req);
3180 } else if (intent) {
3181 ptlrpc_req_finished(req);
3186 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3188 ptlrpc_req_finished(req);
3193 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3194 struct ldlm_enqueue_info *einfo,
3195 struct ptlrpc_request_set *rqset)
3197 struct ldlm_res_id res_id;
3201 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3202 oinfo->oi_md->lsm_object_gr, &res_id);
3204 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3205 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3206 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3207 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3208 rqset, rqset != NULL);
3212 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3213 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3214 int *flags, void *data, struct lustre_handle *lockh,
3217 struct obd_device *obd = exp->exp_obd;
3218 int lflags = *flags;
3222 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3225 /* Filesystem lock extents are extended to page boundaries so that
3226 * dealing with the page cache is a little smoother */
3227 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3228 policy->l_extent.end |= ~CFS_PAGE_MASK;
3230 /* Next, search for already existing extent locks that will cover us */
3231 /* If we're trying to read, we also search for an existing PW lock. The
3232 * VFS and page cache already protect us locally, so lots of readers/
3233 * writers can share a single PW lock. */
3237 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3238 res_id, type, policy, rc, lockh, unref);
3241 osc_set_data_with_check(lockh, data, lflags);
3242 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3243 ldlm_lock_addref(lockh, LCK_PR);
3244 ldlm_lock_decref(lockh, LCK_PW);
3251 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3255 if (unlikely(mode == LCK_GROUP))
3256 ldlm_lock_decref_and_cancel(lockh, mode);
3258 ldlm_lock_decref(lockh, mode);
3263 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3264 __u32 mode, struct lustre_handle *lockh)
3267 RETURN(osc_cancel_base(lockh, mode));
3270 static int osc_cancel_unused(struct obd_export *exp,
3271 struct lov_stripe_md *lsm, int flags,
3274 struct obd_device *obd = class_exp2obd(exp);
3275 struct ldlm_res_id res_id, *resp = NULL;
3278 resp = osc_build_res_name(lsm->lsm_object_id,
3279 lsm->lsm_object_gr, &res_id);
3282 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3285 static int osc_statfs_interpret(const struct lu_env *env,
3286 struct ptlrpc_request *req,
3287 struct osc_async_args *aa, int rc)
3289 struct obd_statfs *msfs;
3295 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3297 GOTO(out, rc = -EPROTO);
3300 *aa->aa_oi->oi_osfs = *msfs;
3302 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3306 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3307 __u64 max_age, struct ptlrpc_request_set *rqset)
3309 struct ptlrpc_request *req;
3310 struct osc_async_args *aa;
3314 /* We could possibly pass max_age in the request (as an absolute
3315 * timestamp or a "seconds.usec ago") so the target can avoid doing
3316 * extra calls into the filesystem if that isn't necessary (e.g.
3317 * during mount that would help a bit). Having relative timestamps
3318 * is not so great if request processing is slow, while absolute
3319 * timestamps are not ideal because they need time synchronization. */
3320 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3326 ptlrpc_request_free(req);
3329 ptlrpc_request_set_replen(req);
3330 req->rq_request_portal = OST_CREATE_PORTAL;
3331 ptlrpc_at_set_req_timeout(req);
3333 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3334 /* procfs requests not want stat in wait for avoid deadlock */
3335 req->rq_no_resend = 1;
3336 req->rq_no_delay = 1;
3339 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3340 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3341 aa = ptlrpc_req_async_args(req);
3344 ptlrpc_set_add_req(rqset, req);
3348 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3349 __u64 max_age, __u32 flags)
3351 struct obd_statfs *msfs;
3352 struct ptlrpc_request *req;
3353 struct obd_import *imp = NULL;
3357 /*Since the request might also come from lprocfs, so we need
3358 *sync this with client_disconnect_export Bug15684*/
3359 down_read(&obd->u.cli.cl_sem);
3360 if (obd->u.cli.cl_import)
3361 imp = class_import_get(obd->u.cli.cl_import);
3362 up_read(&obd->u.cli.cl_sem);
3366 /* We could possibly pass max_age in the request (as an absolute
3367 * timestamp or a "seconds.usec ago") so the target can avoid doing
3368 * extra calls into the filesystem if that isn't necessary (e.g.
3369 * during mount that would help a bit). Having relative timestamps
3370 * is not so great if request processing is slow, while absolute
3371 * timestamps are not ideal because they need time synchronization. */
3372 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3374 class_import_put(imp);
3379 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3381 ptlrpc_request_free(req);
3384 ptlrpc_request_set_replen(req);
3385 req->rq_request_portal = OST_CREATE_PORTAL;
3386 ptlrpc_at_set_req_timeout(req);
3388 if (flags & OBD_STATFS_NODELAY) {
3389 /* procfs requests not want stat in wait for avoid deadlock */
3390 req->rq_no_resend = 1;
3391 req->rq_no_delay = 1;
3394 rc = ptlrpc_queue_wait(req);
3398 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3400 GOTO(out, rc = -EPROTO);
3407 ptlrpc_req_finished(req);
3411 /* Retrieve object striping information.
3413 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3414 * the maximum number of OST indices which will fit in the user buffer.
3415 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3417 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3419 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3420 struct lov_user_md_v3 lum, *lumk;
3421 struct lov_user_ost_data_v1 *lmm_objects;
3422 int rc = 0, lum_size;
3428 /* we only need the header part from user space to get lmm_magic and
3429 * lmm_stripe_count, (the header part is common to v1 and v3) */
3430 lum_size = sizeof(struct lov_user_md_v1);
3431 if (copy_from_user(&lum, lump, lum_size))
3434 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3435 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3438 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3439 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3440 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3441 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3443 /* we can use lov_mds_md_size() to compute lum_size
3444 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3445 if (lum.lmm_stripe_count > 0) {
3446 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3447 OBD_ALLOC(lumk, lum_size);
3451 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3452 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3454 lmm_objects = &(lumk->lmm_objects[0]);
3455 lmm_objects->l_object_id = lsm->lsm_object_id;
3457 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3461 lumk->lmm_object_id = lsm->lsm_object_id;
3462 lumk->lmm_object_gr = lsm->lsm_object_gr;
3463 lumk->lmm_stripe_count = 1;
3465 if (copy_to_user(lump, lumk, lum_size))
3469 OBD_FREE(lumk, lum_size);
3475 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3476 void *karg, void *uarg)
3478 struct obd_device *obd = exp->exp_obd;
3479 struct obd_ioctl_data *data = karg;
3483 if (!try_module_get(THIS_MODULE)) {
3484 CERROR("Can't get module. Is it alive?");
3488 case OBD_IOC_LOV_GET_CONFIG: {
3490 struct lov_desc *desc;
3491 struct obd_uuid uuid;
3495 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3496 GOTO(out, err = -EINVAL);
3498 data = (struct obd_ioctl_data *)buf;
3500 if (sizeof(*desc) > data->ioc_inllen1) {
3501 obd_ioctl_freedata(buf, len);
3502 GOTO(out, err = -EINVAL);
3505 if (data->ioc_inllen2 < sizeof(uuid)) {
3506 obd_ioctl_freedata(buf, len);
3507 GOTO(out, err = -EINVAL);
3510 desc = (struct lov_desc *)data->ioc_inlbuf1;
3511 desc->ld_tgt_count = 1;
3512 desc->ld_active_tgt_count = 1;
3513 desc->ld_default_stripe_count = 1;
3514 desc->ld_default_stripe_size = 0;
3515 desc->ld_default_stripe_offset = 0;
3516 desc->ld_pattern = 0;
3517 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3519 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3521 err = copy_to_user((void *)uarg, buf, len);
3524 obd_ioctl_freedata(buf, len);
3527 case LL_IOC_LOV_SETSTRIPE:
3528 err = obd_alloc_memmd(exp, karg);
3532 case LL_IOC_LOV_GETSTRIPE:
3533 err = osc_getstripe(karg, uarg);
3535 case OBD_IOC_CLIENT_RECOVER:
3536 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3541 case IOC_OSC_SET_ACTIVE:
3542 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3545 case OBD_IOC_POLL_QUOTACHECK:
3546 err = lquota_poll_check(quota_interface, exp,
3547 (struct if_quotacheck *)karg);
3549 case OBD_IOC_PING_TARGET:
3550 err = ptlrpc_obd_ping(obd);
3553 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3554 cmd, cfs_curproc_comm());
3555 GOTO(out, err = -ENOTTY);
3558 module_put(THIS_MODULE);
3562 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3563 void *key, __u32 *vallen, void *val,
3564 struct lov_stripe_md *lsm)
3567 if (!vallen || !val)
3570 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3571 __u32 *stripe = val;
3572 *vallen = sizeof(*stripe);
3575 } else if (KEY_IS(KEY_LAST_ID)) {
3576 struct ptlrpc_request *req;
3581 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3582 &RQF_OST_GET_INFO_LAST_ID);
3586 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3587 RCL_CLIENT, keylen);
3588 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3590 ptlrpc_request_free(req);
3594 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3595 memcpy(tmp, key, keylen);
3597 ptlrpc_request_set_replen(req);
3598 rc = ptlrpc_queue_wait(req);
3602 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3604 GOTO(out, rc = -EPROTO);
3606 *((obd_id *)val) = *reply;
3608 ptlrpc_req_finished(req);
3610 } else if (KEY_IS(KEY_FIEMAP)) {
3611 struct ptlrpc_request *req;
3612 struct ll_user_fiemap *reply;
3616 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3617 &RQF_OST_GET_INFO_FIEMAP);
3621 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3622 RCL_CLIENT, keylen);
3623 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3624 RCL_CLIENT, *vallen);
3625 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3626 RCL_SERVER, *vallen);
3628 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3630 ptlrpc_request_free(req);
3634 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3635 memcpy(tmp, key, keylen);
3636 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3637 memcpy(tmp, val, *vallen);
3639 ptlrpc_request_set_replen(req);
3640 rc = ptlrpc_queue_wait(req);
3644 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3646 GOTO(out1, rc = -EPROTO);
3648 memcpy(val, reply, *vallen);
3650 ptlrpc_req_finished(req);
3658 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3659 struct ptlrpc_request *req,
3662 struct llog_ctxt *ctxt;
3663 struct obd_import *imp = req->rq_import;
3669 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3672 rc = llog_initiator_connect(ctxt);
3674 CERROR("cannot establish connection for "
3675 "ctxt %p: %d\n", ctxt, rc);
3678 llog_ctxt_put(ctxt);
3679 spin_lock(&imp->imp_lock);
3680 imp->imp_server_timeout = 1;
3681 imp->imp_pingable = 1;
3682 spin_unlock(&imp->imp_lock);
3683 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3688 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3689 void *key, obd_count vallen, void *val,
3690 struct ptlrpc_request_set *set)
3692 struct ptlrpc_request *req;
3693 struct obd_device *obd = exp->exp_obd;
3694 struct obd_import *imp = class_exp2cliimp(exp);
3699 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3701 if (KEY_IS(KEY_NEXT_ID)) {
3702 if (vallen != sizeof(obd_id))
3706 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3707 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3708 exp->exp_obd->obd_name,
3709 obd->u.cli.cl_oscc.oscc_next_id);
3714 if (KEY_IS(KEY_UNLINKED)) {
3715 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3716 spin_lock(&oscc->oscc_lock);
3717 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3718 spin_unlock(&oscc->oscc_lock);
3722 if (KEY_IS(KEY_INIT_RECOV)) {
3723 if (vallen != sizeof(int))
3725 spin_lock(&imp->imp_lock);
3726 imp->imp_initial_recov = *(int *)val;
3727 spin_unlock(&imp->imp_lock);
3728 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3729 exp->exp_obd->obd_name,
3730 imp->imp_initial_recov);
3734 if (KEY_IS(KEY_CHECKSUM)) {
3735 if (vallen != sizeof(int))
3737 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3741 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3742 sptlrpc_conf_client_adapt(obd);
3746 if (KEY_IS(KEY_FLUSH_CTX)) {
3747 sptlrpc_import_flush_my_ctx(imp);
3751 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3754 /* We pass all other commands directly to OST. Since nobody calls osc
3755 methods directly and everybody is supposed to go through LOV, we
3756 assume lov checked invalid values for us.
3757 The only recognised values so far are evict_by_nid and mds_conn.
3758 Even if something bad goes through, we'd get a -EINVAL from OST
3761 if (KEY_IS(KEY_GRANT_SHRINK))
3762 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3764 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3769 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3770 RCL_CLIENT, keylen);
3771 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3772 RCL_CLIENT, vallen);
3773 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3775 ptlrpc_request_free(req);
3779 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3780 memcpy(tmp, key, keylen);
3781 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3782 memcpy(tmp, val, vallen);
3784 if (KEY_IS(KEY_MDS_CONN)) {
3785 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3787 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3788 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3789 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3790 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3791 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3792 struct osc_grant_args *aa;
3795 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3796 aa = ptlrpc_req_async_args(req);
3799 ptlrpc_req_finished(req);
3802 *oa = ((struct ost_body *)val)->oa;
3804 req->rq_interpret_reply = osc_shrink_grant_interpret;
3807 ptlrpc_request_set_replen(req);
3808 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3809 LASSERT(set != NULL);
3810 ptlrpc_set_add_req(set, req);
3811 ptlrpc_check_set(NULL, set);
3813 ptlrpcd_add_req(req, PSCOPE_OTHER);
3819 static struct llog_operations osc_size_repl_logops = {
3820 lop_cancel: llog_obd_repl_cancel
3823 static struct llog_operations osc_mds_ost_orig_logops;
3824 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3825 struct obd_device *tgt, int count,
3826 struct llog_catid *catid, struct obd_uuid *uuid)
3831 LASSERT(olg == &obd->obd_olg);
3832 spin_lock(&obd->obd_dev_lock);
3833 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3834 osc_mds_ost_orig_logops = llog_lvfs_ops;
3835 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3836 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3837 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3838 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3840 spin_unlock(&obd->obd_dev_lock);
3842 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3843 &catid->lci_logid, &osc_mds_ost_orig_logops);
3845 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3849 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3850 NULL, &osc_size_repl_logops);
3852 struct llog_ctxt *ctxt =
3853 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3856 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3861 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3862 obd->obd_name, tgt->obd_name, count, catid, rc);
3863 CERROR("logid "LPX64":0x%x\n",
3864 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3869 static int osc_llog_finish(struct obd_device *obd, int count)
3871 struct llog_ctxt *ctxt;
3872 int rc = 0, rc2 = 0;
3875 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3877 rc = llog_cleanup(ctxt);
3879 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3881 rc2 = llog_cleanup(ctxt);
3888 static int osc_reconnect(const struct lu_env *env,
3889 struct obd_export *exp, struct obd_device *obd,
3890 struct obd_uuid *cluuid,
3891 struct obd_connect_data *data,
3894 struct client_obd *cli = &obd->u.cli;
3896 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3899 client_obd_list_lock(&cli->cl_loi_list_lock);
3900 data->ocd_grant = cli->cl_avail_grant ?:
3901 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3902 lost_grant = cli->cl_lost_grant;
3903 cli->cl_lost_grant = 0;
3904 client_obd_list_unlock(&cli->cl_loi_list_lock);
3906 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3907 "cl_lost_grant: %ld\n", data->ocd_grant,
3908 cli->cl_avail_grant, lost_grant);
3909 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3910 " ocd_grant: %d\n", data->ocd_connect_flags,
3911 data->ocd_version, data->ocd_grant);
3917 static int osc_disconnect(struct obd_export *exp)
3919 struct obd_device *obd = class_exp2obd(exp);
3920 struct llog_ctxt *ctxt;
3923 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3925 if (obd->u.cli.cl_conn_count == 1) {
3926 /* Flush any remaining cancel messages out to the
3928 llog_sync(ctxt, exp);
3930 llog_ctxt_put(ctxt);
3932 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3936 osc_del_shrink_grant(&obd->u.cli);
3937 rc = client_disconnect_export(exp);
3941 static int osc_import_event(struct obd_device *obd,
3942 struct obd_import *imp,
3943 enum obd_import_event event)
3945 struct client_obd *cli;
3949 LASSERT(imp->imp_obd == obd);
3952 case IMP_EVENT_DISCON: {
3953 /* Only do this on the MDS OSC's */
3954 if (imp->imp_server_timeout) {
3955 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3957 spin_lock(&oscc->oscc_lock);
3958 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3959 spin_unlock(&oscc->oscc_lock);
3962 client_obd_list_lock(&cli->cl_loi_list_lock);
3963 cli->cl_avail_grant = 0;
3964 cli->cl_lost_grant = 0;
3965 client_obd_list_unlock(&cli->cl_loi_list_lock);
3968 case IMP_EVENT_INACTIVE: {
3969 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3972 case IMP_EVENT_INVALIDATE: {
3973 struct ldlm_namespace *ns = obd->obd_namespace;
3977 env = cl_env_get(&refcheck);
3981 client_obd_list_lock(&cli->cl_loi_list_lock);
3982 /* all pages go to failing rpcs due to the invalid
3984 osc_check_rpcs(env, cli);
3985 client_obd_list_unlock(&cli->cl_loi_list_lock);
3987 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3988 cl_env_put(env, &refcheck);
3993 case IMP_EVENT_ACTIVE: {
3994 /* Only do this on the MDS OSC's */
3995 if (imp->imp_server_timeout) {
3996 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3998 spin_lock(&oscc->oscc_lock);
3999 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4000 spin_unlock(&oscc->oscc_lock);
4002 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4005 case IMP_EVENT_OCD: {
4006 struct obd_connect_data *ocd = &imp->imp_connect_data;
4008 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4009 osc_init_grant(&obd->u.cli, ocd);
4012 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4013 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4015 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4019 CERROR("Unknown import event %d\n", event);
4025 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4031 rc = ptlrpcd_addref();
4035 rc = client_obd_setup(obd, lcfg);
4039 struct lprocfs_static_vars lvars = { 0 };
4040 struct client_obd *cli = &obd->u.cli;
4042 lprocfs_osc_init_vars(&lvars);
4043 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4044 lproc_osc_attach_seqstat(obd);
4045 sptlrpc_lprocfs_cliobd_attach(obd);
4046 ptlrpc_lprocfs_register_obd(obd);
4050 /* We need to allocate a few requests more, because
4051 brw_interpret tries to create new requests before freeing
4052 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4053 reserved, but I afraid that might be too much wasted RAM
4054 in fact, so 2 is just my guess and still should work. */
4055 cli->cl_import->imp_rq_pool =
4056 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4058 ptlrpc_add_rqs_to_pool);
4060 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4061 sema_init(&cli->cl_grant_sem, 1);
4067 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4073 case OBD_CLEANUP_EARLY: {
4074 struct obd_import *imp;
4075 imp = obd->u.cli.cl_import;
4076 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4077 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4078 ptlrpc_deactivate_import(imp);
4079 spin_lock(&imp->imp_lock);
4080 imp->imp_pingable = 0;
4081 spin_unlock(&imp->imp_lock);
4084 case OBD_CLEANUP_EXPORTS: {
4085 /* If we set up but never connected, the
4086 client import will not have been cleaned. */
4087 if (obd->u.cli.cl_import) {
4088 struct obd_import *imp;
4089 down_write(&obd->u.cli.cl_sem);
4090 imp = obd->u.cli.cl_import;
4091 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4093 ptlrpc_invalidate_import(imp);
4094 if (imp->imp_rq_pool) {
4095 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4096 imp->imp_rq_pool = NULL;
4098 class_destroy_import(imp);
4099 up_write(&obd->u.cli.cl_sem);
4100 obd->u.cli.cl_import = NULL;
4102 rc = obd_llog_finish(obd, 0);
4104 CERROR("failed to cleanup llogging subsystems\n");
4111 int osc_cleanup(struct obd_device *obd)
4113 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4117 ptlrpc_lprocfs_unregister_obd(obd);
4118 lprocfs_obd_cleanup(obd);
4120 spin_lock(&oscc->oscc_lock);
4121 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4122 oscc->oscc_flags |= OSCC_FLAG_EXITING;
4123 spin_unlock(&oscc->oscc_lock);
4125 /* free memory of osc quota cache */
4126 lquota_cleanup(quota_interface, obd);
4128 rc = client_obd_cleanup(obd);
4134 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4136 struct lprocfs_static_vars lvars = { 0 };
4139 lprocfs_osc_init_vars(&lvars);
4141 switch (lcfg->lcfg_command) {
4143 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4153 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4155 return osc_process_config_base(obd, buf);
4158 struct obd_ops osc_obd_ops = {
4159 .o_owner = THIS_MODULE,
4160 .o_setup = osc_setup,
4161 .o_precleanup = osc_precleanup,
4162 .o_cleanup = osc_cleanup,
4163 .o_add_conn = client_import_add_conn,
4164 .o_del_conn = client_import_del_conn,
4165 .o_connect = client_connect_import,
4166 .o_reconnect = osc_reconnect,
4167 .o_disconnect = osc_disconnect,
4168 .o_statfs = osc_statfs,
4169 .o_statfs_async = osc_statfs_async,
4170 .o_packmd = osc_packmd,
4171 .o_unpackmd = osc_unpackmd,
4172 .o_precreate = osc_precreate,
4173 .o_create = osc_create,
4174 .o_destroy = osc_destroy,
4175 .o_getattr = osc_getattr,
4176 .o_getattr_async = osc_getattr_async,
4177 .o_setattr = osc_setattr,
4178 .o_setattr_async = osc_setattr_async,
4180 .o_punch = osc_punch,
4182 .o_enqueue = osc_enqueue,
4183 .o_change_cbdata = osc_change_cbdata,
4184 .o_cancel = osc_cancel,
4185 .o_cancel_unused = osc_cancel_unused,
4186 .o_iocontrol = osc_iocontrol,
4187 .o_get_info = osc_get_info,
4188 .o_set_info_async = osc_set_info_async,
4189 .o_import_event = osc_import_event,
4190 .o_llog_init = osc_llog_init,
4191 .o_llog_finish = osc_llog_finish,
4192 .o_process_config = osc_process_config,
4195 extern struct lu_kmem_descr osc_caches[];
4196 extern spinlock_t osc_ast_guard;
4197 extern struct lock_class_key osc_ast_guard_class;
4199 int __init osc_init(void)
4201 struct lprocfs_static_vars lvars = { 0 };
4205 /* print an address of _any_ initialized kernel symbol from this
4206 * module, to allow debugging with gdb that doesn't support data
4207 * symbols from modules.*/
4208 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4210 rc = lu_kmem_init(osc_caches);
4212 lprocfs_osc_init_vars(&lvars);
4214 request_module("lquota");
4215 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4216 lquota_init(quota_interface);
4217 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4219 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4220 LUSTRE_OSC_NAME, &osc_device_type);
4222 if (quota_interface)
4223 PORTAL_SYMBOL_PUT(osc_quota_interface);
4224 lu_kmem_fini(osc_caches);
4228 spin_lock_init(&osc_ast_guard);
4229 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4235 static void /*__exit*/ osc_exit(void)
4237 lu_device_type_fini(&osc_device_type);
4239 lquota_exit(quota_interface);
4240 if (quota_interface)
4241 PORTAL_SYMBOL_PUT(osc_quota_interface);
4243 class_unregister_type(LUSTRE_OSC_NAME);
4244 lu_kmem_fini(osc_caches);
4247 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4248 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4249 MODULE_LICENSE("GPL");
4251 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);