1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 body->oa = *oinfo->oi_oa;
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214 lustre_swab_ost_body);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 *oinfo->oi_oa = body->oa;
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308 struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316 CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317 "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318 oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327 ptlrpc_request_free(req);
331 osc_pack_req_body(req, oinfo);
333 ptlrpc_request_set_replen(req);
335 rc = ptlrpc_queue_wait(req);
339 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EPROTO);
343 *oinfo->oi_oa = body->oa;
347 ptlrpc_req_finished(req);
351 static int osc_setattr_interpret(const struct lu_env *env,
352 struct ptlrpc_request *req,
353 struct osc_async_args *aa, int rc)
355 struct ost_body *body;
361 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363 GOTO(out, rc = -EPROTO);
365 *aa->aa_oi->oi_oa = body->oa;
367 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372 struct obd_trans_info *oti,
373 struct ptlrpc_request_set *rqset)
375 struct ptlrpc_request *req;
376 struct osc_async_args *aa;
380 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387 ptlrpc_request_free(req);
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
400 /* do mds to ost setattr asynchronously */
402 /* Do not wait for response. */
403 ptlrpcd_add_req(req, PSCOPE_OTHER);
405 req->rq_interpret_reply =
406 (ptlrpc_interpterer_t)osc_setattr_interpret;
408 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409 aa = ptlrpc_req_async_args(req);
412 ptlrpc_set_add_req(rqset, req);
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419 struct lov_stripe_md **ea, struct obd_trans_info *oti)
421 struct ptlrpc_request *req;
422 struct ost_body *body;
423 struct lov_stripe_md *lsm;
432 rc = obd_alloc_memmd(exp, &lsm);
437 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
439 GOTO(out, rc = -ENOMEM);
441 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
443 ptlrpc_request_free(req);
447 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
451 ptlrpc_request_set_replen(req);
453 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454 oa->o_flags == OBD_FL_DELORPHAN) {
456 "delorphan from OST integration");
457 /* Don't resend the delorphan req */
458 req->rq_no_resend = req->rq_no_delay = 1;
461 rc = ptlrpc_queue_wait(req);
465 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
467 GOTO(out_req, rc = -EPROTO);
471 /* This should really be sent by the OST */
472 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473 oa->o_valid |= OBD_MD_FLBLKSZ;
475 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476 * have valid lsm_oinfo data structs, so don't go touching that.
477 * This needs to be fixed in a big way.
479 lsm->lsm_object_id = oa->o_id;
480 lsm->lsm_object_gr = oa->o_gr;
484 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
486 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487 if (!oti->oti_logcookies)
488 oti_alloc_cookies(oti, 1);
489 *oti->oti_logcookies = oa->o_lcookie;
493 CDEBUG(D_HA, "transno: "LPD64"\n",
494 lustre_msg_get_transno(req->rq_repmsg));
496 ptlrpc_req_finished(req);
499 obd_free_memmd(exp, &lsm);
503 static int osc_punch_interpret(const struct lu_env *env,
504 struct ptlrpc_request *req,
505 struct osc_punch_args *aa, int rc)
507 struct ost_body *body;
513 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
515 GOTO(out, rc = -EPROTO);
517 *aa->pa_oa = body->oa;
519 rc = aa->pa_upcall(aa->pa_cookie, rc);
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524 struct obd_capa *capa,
525 obd_enqueue_update_f upcall, void *cookie,
526 struct ptlrpc_request_set *rqset)
528 struct ptlrpc_request *req;
529 struct osc_punch_args *aa;
530 struct ost_body *body;
534 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
538 osc_set_capa_size(req, &RMF_CAPA1, capa);
539 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
541 ptlrpc_request_free(req);
544 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545 ptlrpc_at_set_req_timeout(req);
547 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
550 osc_pack_capa(req, body, capa);
552 ptlrpc_request_set_replen(req);
555 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557 aa = ptlrpc_req_async_args(req);
559 aa->pa_upcall = upcall;
560 aa->pa_cookie = cookie;
561 if (rqset == PTLRPCD_SET)
562 ptlrpcd_add_req(req, PSCOPE_OTHER);
564 ptlrpc_set_add_req(rqset, req);
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570 struct obd_trans_info *oti,
571 struct ptlrpc_request_set *rqset)
573 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
574 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577 oinfo->oi_cb_up, oinfo, rqset);
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *md, obd_size start, obd_size end,
584 struct ptlrpc_request *req;
585 struct ost_body *body;
590 CDEBUG(D_INFO, "oa NULL\n");
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598 osc_set_capa_size(req, &RMF_CAPA1, capa);
599 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 ptlrpc_request_free(req);
605 /* overload the size and blocks fields in the oa with start/end */
606 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609 body->oa.o_size = start;
610 body->oa.o_blocks = end;
611 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612 osc_pack_capa(req, body, capa);
614 ptlrpc_request_set_replen(req);
616 rc = ptlrpc_queue_wait(req);
620 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
622 GOTO(out, rc = -EPROTO);
628 ptlrpc_req_finished(req);
632 /* Find and cancel locally locks matched by @mode in the resource found by
633 * @objid. Found locks are added into @cancel list. Returns the amount of
634 * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636 struct list_head *cancels, ldlm_mode_t mode,
639 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640 struct ldlm_res_id res_id;
641 struct ldlm_resource *res;
645 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
650 LDLM_RESOURCE_ADDREF(res);
651 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652 lock_flags, 0, NULL);
653 LDLM_RESOURCE_DELREF(res);
654 ldlm_resource_putref(res);
658 static int osc_destroy_interpret(const struct lu_env *env,
659 struct ptlrpc_request *req, void *data,
662 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
664 atomic_dec(&cli->cl_destroy_in_flight);
665 cfs_waitq_signal(&cli->cl_destroy_waitq);
669 static int osc_can_send_destroy(struct client_obd *cli)
671 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672 cli->cl_max_rpcs_in_flight) {
673 /* The destroy request can be sent */
676 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677 cli->cl_max_rpcs_in_flight) {
679 * The counter has been modified between the two atomic
682 cfs_waitq_signal(&cli->cl_destroy_waitq);
687 /* Destroy requests can be async always on the client, and we don't even really
688 * care about the return code since the client cannot do anything at all about
690 * When the MDS is unlinking a filename, it saves the file objects into a
691 * recovery llog, and these object records are cancelled when the OST reports
692 * they were destroyed and sync'd to disk (i.e. transaction committed).
693 * If the client dies, or the OST is down when the object should be destroyed,
694 * the records are not cancelled, and when the OST reconnects to the MDS next,
695 * it will retrieve the llog unlink logs and then sends the log cancellation
696 * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698 struct lov_stripe_md *ea, struct obd_trans_info *oti,
699 struct obd_export *md_export, void *capa)
701 struct client_obd *cli = &exp->exp_obd->u.cli;
702 struct ptlrpc_request *req;
703 struct ost_body *body;
704 CFS_LIST_HEAD(cancels);
709 CDEBUG(D_INFO, "oa NULL\n");
713 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714 LDLM_FL_DISCARD_DATA);
716 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
718 ldlm_lock_list_put(&cancels, l_bl_ast, count);
722 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
726 ptlrpc_request_free(req);
730 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731 ptlrpc_at_set_req_timeout(req);
733 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734 oa->o_lcookie = *oti->oti_logcookies;
735 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
739 osc_pack_capa(req, body, (struct obd_capa *)capa);
740 ptlrpc_request_set_replen(req);
742 /* don't throttle destroy RPCs for the MDT */
743 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744 req->rq_interpret_reply = osc_destroy_interpret;
745 if (!osc_can_send_destroy(cli)) {
746 struct l_wait_info lwi = { 0 };
749 * Wait until the number of on-going destroy RPCs drops
750 * under max_rpc_in_flight
752 l_wait_event_exclusive(cli->cl_destroy_waitq,
753 osc_can_send_destroy(cli), &lwi);
757 /* Do not wait for response */
758 ptlrpcd_add_req(req, PSCOPE_OTHER);
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
765 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767 LASSERT(!(oa->o_valid & bits));
770 client_obd_list_lock(&cli->cl_loi_list_lock);
771 oa->o_dirty = cli->cl_dirty;
772 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776 } else if (atomic_read(&obd_dirty_pages) -
777 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778 CERROR("dirty %d - %d > system dirty_max %d\n",
779 atomic_read(&obd_dirty_pages),
780 atomic_read(&obd_dirty_transit_pages),
781 obd_max_dirty_pages);
783 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784 CERROR("dirty %lu - dirty_max %lu too big???\n",
785 cli->cl_dirty, cli->cl_dirty_max);
788 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789 (cli->cl_max_rpcs_in_flight + 1);
790 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792 oa->o_grant = cli->cl_avail_grant;
793 oa->o_dropped = cli->cl_lost_grant;
794 cli->cl_lost_grant = 0;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
796 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
801 static void osc_update_next_shrink(struct client_obd *cli)
803 cli->cl_next_shrink_grant =
804 cfs_time_shift(cli->cl_grant_shrink_interval);
805 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806 cli->cl_next_shrink_grant);
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811 struct brw_page *pga)
813 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
814 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
815 atomic_inc(&obd_dirty_pages);
816 cli->cl_dirty += CFS_PAGE_SIZE;
817 cli->cl_avail_grant -= CFS_PAGE_SIZE;
818 pga->flag |= OBD_BRW_FROM_GRANT;
819 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
820 CFS_PAGE_SIZE, pga, pga->pg);
821 LASSERT(cli->cl_avail_grant >= 0);
822 osc_update_next_shrink(cli);
825 /* the companion to osc_consume_write_grant, called when a brw has completed.
826 * must be called with the loi lock held. */
827 static void osc_release_write_grant(struct client_obd *cli,
828 struct brw_page *pga, int sent)
830 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
833 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
834 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
839 pga->flag &= ~OBD_BRW_FROM_GRANT;
840 atomic_dec(&obd_dirty_pages);
841 cli->cl_dirty -= CFS_PAGE_SIZE;
842 if (pga->flag & OBD_BRW_NOCACHE) {
843 pga->flag &= ~OBD_BRW_NOCACHE;
844 atomic_dec(&obd_dirty_transit_pages);
845 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
848 cli->cl_lost_grant += CFS_PAGE_SIZE;
849 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
850 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
851 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
852 /* For short writes we shouldn't count parts of pages that
853 * span a whole block on the OST side, or our accounting goes
854 * wrong. Should match the code in filter_grant_check. */
855 int offset = pga->off & ~CFS_PAGE_MASK;
856 int count = pga->count + (offset & (blocksize - 1));
857 int end = (offset + pga->count) & (blocksize - 1);
859 count += blocksize - end;
861 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
862 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
863 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
864 cli->cl_avail_grant, cli->cl_dirty);
870 static unsigned long rpcs_in_flight(struct client_obd *cli)
872 return cli->cl_r_in_flight + cli->cl_w_in_flight;
875 /* caller must hold loi_list_lock */
876 void osc_wake_cache_waiters(struct client_obd *cli)
878 struct list_head *l, *tmp;
879 struct osc_cache_waiter *ocw;
882 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
883 /* if we can't dirty more, we must wait until some is written */
884 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
885 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
886 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887 "osc max %ld, sys max %d\n", cli->cl_dirty,
888 cli->cl_dirty_max, obd_max_dirty_pages);
892 /* if still dirty cache but no grant wait for pending RPCs that
893 * may yet return us some grant before doing sync writes */
894 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896 cli->cl_w_in_flight);
900 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
901 list_del_init(&ocw->ocw_entry);
902 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903 /* no more RPCs in flight to return grant, do sync IO */
904 ocw->ocw_rc = -EDQUOT;
905 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
907 osc_consume_write_grant(cli,
908 &ocw->ocw_oap->oap_brw_page);
911 cfs_waitq_signal(&ocw->ocw_waitq);
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
919 client_obd_list_lock(&cli->cl_loi_list_lock);
920 cli->cl_avail_grant += grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
926 if (body->oa.o_valid & OBD_MD_FLGRANT) {
927 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928 __osc_update_grant(cli, body->oa.o_grant);
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933 void *key, obd_count vallen, void *val,
934 struct ptlrpc_request_set *set);
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937 struct ptlrpc_request *req,
940 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942 struct ost_body *body;
945 __osc_update_grant(cli, oa->o_grant);
949 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
951 osc_update_grant(cli, body);
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
959 client_obd_list_lock(&cli->cl_loi_list_lock);
960 oa->o_grant = cli->cl_avail_grant / 4;
961 cli->cl_avail_grant -= oa->o_grant;
962 client_obd_list_unlock(&cli->cl_loi_list_lock);
963 oa->o_flags |= OBD_FL_SHRINK_GRANT;
964 osc_update_next_shrink(cli);
967 /* Shrink the current grant, either from some large amount to enough for a
968 * full set of in-flight RPCs, or if we have already shrunk to that limit
969 * then to enough for a single RPC. This avoids keeping more grant than
970 * needed, and avoids shrinking the grant piecemeal. */
971 static int osc_shrink_grant(struct client_obd *cli)
973 long target = (cli->cl_max_rpcs_in_flight + 1) *
974 cli->cl_max_pages_per_rpc;
976 client_obd_list_lock(&cli->cl_loi_list_lock);
977 if (cli->cl_avail_grant <= target)
978 target = cli->cl_max_pages_per_rpc;
979 client_obd_list_unlock(&cli->cl_loi_list_lock);
981 return osc_shrink_grant_to_target(cli, target);
984 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
987 struct ost_body *body;
990 client_obd_list_lock(&cli->cl_loi_list_lock);
991 /* Don't shrink if we are already above or below the desired limit
992 * We don't want to shrink below a single RPC, as that will negatively
993 * impact block allocation and long-term performance. */
994 if (target < cli->cl_max_pages_per_rpc)
995 target = cli->cl_max_pages_per_rpc;
997 if (target >= cli->cl_avail_grant) {
998 client_obd_list_unlock(&cli->cl_loi_list_lock);
1001 client_obd_list_unlock(&cli->cl_loi_list_lock);
1003 OBD_ALLOC_PTR(body);
1007 osc_announce_cached(cli, &body->oa, 0);
1009 client_obd_list_lock(&cli->cl_loi_list_lock);
1010 body->oa.o_grant = cli->cl_avail_grant - target;
1011 cli->cl_avail_grant = target;
1012 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1014 osc_update_next_shrink(cli);
1016 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1017 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1018 sizeof(*body), body, NULL);
1020 __osc_update_grant(cli, body->oa.o_grant);
1025 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1026 static int osc_should_shrink_grant(struct client_obd *client)
1028 cfs_time_t time = cfs_time_current();
1029 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1030 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1031 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1032 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1035 osc_update_next_shrink(client);
1040 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1042 struct client_obd *client;
1044 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1045 if (osc_should_shrink_grant(client))
1046 osc_shrink_grant(client);
1051 static int osc_add_shrink_grant(struct client_obd *client)
1055 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1057 osc_grant_shrink_grant_cb, NULL,
1058 &client->cl_grant_shrink_list);
1060 CERROR("add grant client %s error %d\n",
1061 client->cl_import->imp_obd->obd_name, rc);
1064 CDEBUG(D_CACHE, "add grant client %s \n",
1065 client->cl_import->imp_obd->obd_name);
1066 osc_update_next_shrink(client);
1070 static int osc_del_shrink_grant(struct client_obd *client)
1072 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1076 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1078 client_obd_list_lock(&cli->cl_loi_list_lock);
1079 cli->cl_avail_grant = ocd->ocd_grant;
1080 client_obd_list_unlock(&cli->cl_loi_list_lock);
1082 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1083 list_empty(&cli->cl_grant_shrink_list))
1084 osc_add_shrink_grant(cli);
1086 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1087 cli->cl_avail_grant, cli->cl_lost_grant);
1088 LASSERT(cli->cl_avail_grant >= 0);
1091 /* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
1095 static void handle_short_read(int nob_read, obd_count page_count,
1096 struct brw_page **pga)
1101 /* skip bytes read OK */
1102 while (nob_read > 0) {
1103 LASSERT (page_count > 0);
1105 if (pga[i]->count > nob_read) {
1106 /* EOF inside this page */
1107 ptr = cfs_kmap(pga[i]->pg) +
1108 (pga[i]->off & ~CFS_PAGE_MASK);
1109 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110 cfs_kunmap(pga[i]->pg);
1116 nob_read -= pga[i]->count;
1121 /* zero remaining pages */
1122 while (page_count-- > 0) {
1123 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124 memset(ptr, 0, pga[i]->count);
1125 cfs_kunmap(pga[i]->pg);
1130 static int check_write_rcs(struct ptlrpc_request *req,
1131 int requested_nob, int niocount,
1132 obd_count page_count, struct brw_page **pga)
1136 /* return error if any niobuf was in error */
1137 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1138 sizeof(*remote_rcs) * niocount, NULL);
1139 if (remote_rcs == NULL) {
1140 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1143 if (lustre_msg_swabbed(req->rq_repmsg))
1144 for (i = 0; i < niocount; i++)
1145 __swab32s(&remote_rcs[i]);
1147 for (i = 0; i < niocount; i++) {
1148 if (remote_rcs[i] < 0)
1149 return(remote_rcs[i]);
1151 if (remote_rcs[i] != 0) {
1152 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1153 i, remote_rcs[i], req);
1158 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1159 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1160 req->rq_bulk->bd_nob_transferred, requested_nob);
1167 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1169 if (p1->flag != p2->flag) {
1170 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1171 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1173 /* warn if we try to combine flags that we don't know to be
1174 * safe to combine */
1175 if ((p1->flag & mask) != (p2->flag & mask))
1176 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1177 "same brw?\n", p1->flag, p2->flag);
1181 return (p1->off + p1->count == p2->off);
1184 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1185 struct brw_page **pga, int opc,
1186 cksum_type_t cksum_type)
1191 LASSERT (pg_count > 0);
1192 cksum = init_checksum(cksum_type);
1193 while (nob > 0 && pg_count > 0) {
1194 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1195 int off = pga[i]->off & ~CFS_PAGE_MASK;
1196 int count = pga[i]->count > nob ? nob : pga[i]->count;
1198 /* corrupt the data before we compute the checksum, to
1199 * simulate an OST->client data error */
1200 if (i == 0 && opc == OST_READ &&
1201 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1202 memcpy(ptr + off, "bad1", min(4, nob));
1203 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1204 cfs_kunmap(pga[i]->pg);
1205 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1208 nob -= pga[i]->count;
1212 /* For sending we only compute the wrong checksum instead
1213 * of corrupting the data so it is still correct on a redo */
1214 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1220 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1221 struct lov_stripe_md *lsm, obd_count page_count,
1222 struct brw_page **pga,
1223 struct ptlrpc_request **reqp,
1224 struct obd_capa *ocapa, int reserve)
1226 struct ptlrpc_request *req;
1227 struct ptlrpc_bulk_desc *desc;
1228 struct ost_body *body;
1229 struct obd_ioobj *ioobj;
1230 struct niobuf_remote *niobuf;
1231 int niocount, i, requested_nob, opc, rc;
1232 struct osc_brw_async_args *aa;
1233 struct req_capsule *pill;
1234 struct brw_page *pg_prev;
1237 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1238 RETURN(-ENOMEM); /* Recoverable */
1239 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1240 RETURN(-EINVAL); /* Fatal */
1242 if ((cmd & OBD_BRW_WRITE) != 0) {
1244 req = ptlrpc_request_alloc_pool(cli->cl_import,
1245 cli->cl_import->imp_rq_pool,
1249 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1254 for (niocount = i = 1; i < page_count; i++) {
1255 if (!can_merge_pages(pga[i - 1], pga[i]))
1259 pill = &req->rq_pill;
1260 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1261 niocount * sizeof(*niobuf));
1262 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1264 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1266 ptlrpc_request_free(req);
1269 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1270 ptlrpc_at_set_req_timeout(req);
1272 if (opc == OST_WRITE)
1273 desc = ptlrpc_prep_bulk_imp(req, page_count,
1274 BULK_GET_SOURCE, OST_BULK_PORTAL);
1276 desc = ptlrpc_prep_bulk_imp(req, page_count,
1277 BULK_PUT_SINK, OST_BULK_PORTAL);
1280 GOTO(out, rc = -ENOMEM);
1281 /* NB request now owns desc and will free it when it gets freed */
1283 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1284 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1285 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1286 LASSERT(body && ioobj && niobuf);
1290 obdo_to_ioobj(oa, ioobj);
1291 ioobj->ioo_bufcnt = niocount;
1292 osc_pack_capa(req, body, ocapa);
1293 LASSERT (page_count > 0);
1295 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1296 struct brw_page *pg = pga[i];
1298 LASSERT(pg->count > 0);
1299 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1300 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1301 pg->off, pg->count);
1303 LASSERTF(i == 0 || pg->off > pg_prev->off,
1304 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1305 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1307 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1308 pg_prev->pg, page_private(pg_prev->pg),
1309 pg_prev->pg->index, pg_prev->off);
1311 LASSERTF(i == 0 || pg->off > pg_prev->off,
1312 "i %d p_c %u\n", i, page_count);
1314 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1315 (pg->flag & OBD_BRW_SRVLOCK));
1317 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1319 requested_nob += pg->count;
1321 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1323 niobuf->len += pg->count;
1325 niobuf->offset = pg->off;
1326 niobuf->len = pg->count;
1327 niobuf->flags = pg->flag;
1332 LASSERTF((void *)(niobuf - niocount) ==
1333 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1334 niocount * sizeof(*niobuf)),
1335 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1336 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1337 (void *)(niobuf - niocount));
1339 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1340 if (osc_should_shrink_grant(cli))
1341 osc_shrink_grant_local(cli, &body->oa);
1343 /* size[REQ_REC_OFF] still sizeof (*body) */
1344 if (opc == OST_WRITE) {
1345 if (unlikely(cli->cl_checksum) &&
1346 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1347 /* store cl_cksum_type in a local variable since
1348 * it can be changed via lprocfs */
1349 cksum_type_t cksum_type = cli->cl_cksum_type;
1351 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1352 oa->o_flags = body->oa.o_flags = 0;
1353 body->oa.o_flags |= cksum_type_pack(cksum_type);
1354 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1355 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1359 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1361 /* save this in 'oa', too, for later checking */
1362 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1363 oa->o_flags |= cksum_type_pack(cksum_type);
1365 /* clear out the checksum flag, in case this is a
1366 * resend but cl_checksum is no longer set. b=11238 */
1367 oa->o_valid &= ~OBD_MD_FLCKSUM;
1369 oa->o_cksum = body->oa.o_cksum;
1370 /* 1 RC per niobuf */
1371 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1372 sizeof(__u32) * niocount);
1374 if (unlikely(cli->cl_checksum) &&
1375 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1376 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1377 body->oa.o_flags = 0;
1378 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1379 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1382 /* 1 RC for the whole I/O */
1384 ptlrpc_request_set_replen(req);
1386 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1387 aa = ptlrpc_req_async_args(req);
1389 aa->aa_requested_nob = requested_nob;
1390 aa->aa_nio_count = niocount;
1391 aa->aa_page_count = page_count;
1395 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1396 if (ocapa && reserve)
1397 aa->aa_ocapa = capa_get(ocapa);
1403 ptlrpc_req_finished(req);
1407 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1408 __u32 client_cksum, __u32 server_cksum, int nob,
1409 obd_count page_count, struct brw_page **pga,
1410 cksum_type_t client_cksum_type)
1414 cksum_type_t cksum_type;
1416 if (server_cksum == client_cksum) {
1417 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1421 if (oa->o_valid & OBD_MD_FLFLAGS)
1422 cksum_type = cksum_type_unpack(oa->o_flags);
1424 cksum_type = OBD_CKSUM_CRC32;
1426 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1429 if (cksum_type != client_cksum_type)
1430 msg = "the server did not use the checksum type specified in "
1431 "the original request - likely a protocol problem";
1432 else if (new_cksum == server_cksum)
1433 msg = "changed on the client after we checksummed it - "
1434 "likely false positive due to mmap IO (bug 11742)";
1435 else if (new_cksum == client_cksum)
1436 msg = "changed in transit before arrival at OST";
1438 msg = "changed in transit AND doesn't match the original - "
1439 "likely false positive due to mmap IO (bug 11742)";
1441 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1442 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1443 "["LPU64"-"LPU64"]\n",
1444 msg, libcfs_nid2str(peer->nid),
1445 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1446 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1449 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1451 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1452 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1453 "client csum now %x\n", client_cksum, client_cksum_type,
1454 server_cksum, cksum_type, new_cksum);
1458 /* Note rc enters this function as number of bytes transferred */
1459 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1461 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1462 const lnet_process_id_t *peer =
1463 &req->rq_import->imp_connection->c_peer;
1464 struct client_obd *cli = aa->aa_cli;
1465 struct ost_body *body;
1466 __u32 client_cksum = 0;
1469 if (rc < 0 && rc != -EDQUOT)
1472 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1473 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1474 lustre_swab_ost_body);
1476 CDEBUG(D_INFO, "Can't unpack body\n");
1480 /* set/clear over quota flag for a uid/gid */
1481 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1482 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1483 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1485 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1492 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1493 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1495 osc_update_grant(cli, body);
1497 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1499 CERROR("Unexpected +ve rc %d\n", rc);
1502 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1504 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1507 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1508 check_write_checksum(&body->oa, peer, client_cksum,
1509 body->oa.o_cksum, aa->aa_requested_nob,
1510 aa->aa_page_count, aa->aa_ppga,
1511 cksum_type_unpack(aa->aa_oa->o_flags)))
1514 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1515 aa->aa_page_count, aa->aa_ppga);
1519 /* The rest of this function executes only for OST_READs */
1521 /* if unwrap_bulk failed, return -EAGAIN to retry */
1522 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1524 GOTO(out, rc = -EAGAIN);
1526 if (rc > aa->aa_requested_nob) {
1527 CERROR("Unexpected rc %d (%d requested)\n", rc,
1528 aa->aa_requested_nob);
1532 if (rc != req->rq_bulk->bd_nob_transferred) {
1533 CERROR ("Unexpected rc %d (%d transferred)\n",
1534 rc, req->rq_bulk->bd_nob_transferred);
1538 if (rc < aa->aa_requested_nob)
1539 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1541 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1542 static int cksum_counter;
1543 __u32 server_cksum = body->oa.o_cksum;
1546 cksum_type_t cksum_type;
1548 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1549 cksum_type = cksum_type_unpack(body->oa.o_flags);
1551 cksum_type = OBD_CKSUM_CRC32;
1552 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1553 aa->aa_ppga, OST_READ,
1556 if (peer->nid == req->rq_bulk->bd_sender) {
1560 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1563 if (server_cksum == ~0 && rc > 0) {
1564 CERROR("Protocol error: server %s set the 'checksum' "
1565 "bit, but didn't send a checksum. Not fatal, "
1566 "but please notify on http://bugzilla.lustre.org/\n",
1567 libcfs_nid2str(peer->nid));
1568 } else if (server_cksum != client_cksum) {
1569 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1570 "%s%s%s inum "LPU64"/"LPU64" object "
1571 LPU64"/"LPU64" extent "
1572 "["LPU64"-"LPU64"]\n",
1573 req->rq_import->imp_obd->obd_name,
1574 libcfs_nid2str(peer->nid),
1576 body->oa.o_valid & OBD_MD_FLFID ?
1577 body->oa.o_fid : (__u64)0,
1578 body->oa.o_valid & OBD_MD_FLFID ?
1579 body->oa.o_generation :(__u64)0,
1581 body->oa.o_valid & OBD_MD_FLGROUP ?
1582 body->oa.o_gr : (__u64)0,
1583 aa->aa_ppga[0]->off,
1584 aa->aa_ppga[aa->aa_page_count-1]->off +
1585 aa->aa_ppga[aa->aa_page_count-1]->count -
1587 CERROR("client %x, server %x, cksum_type %x\n",
1588 client_cksum, server_cksum, cksum_type);
1590 aa->aa_oa->o_cksum = client_cksum;
1594 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1597 } else if (unlikely(client_cksum)) {
1598 static int cksum_missed;
1601 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1602 CERROR("Checksum %u requested from %s but not sent\n",
1603 cksum_missed, libcfs_nid2str(peer->nid));
1609 *aa->aa_oa = body->oa;
1614 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1615 struct lov_stripe_md *lsm,
1616 obd_count page_count, struct brw_page **pga,
1617 struct obd_capa *ocapa)
1619 struct ptlrpc_request *req;
1623 struct l_wait_info lwi;
1627 cfs_waitq_init(&waitq);
1630 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1631 page_count, pga, &req, ocapa, 0);
1635 rc = ptlrpc_queue_wait(req);
1637 if (rc == -ETIMEDOUT && req->rq_resend) {
1638 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1639 ptlrpc_req_finished(req);
1643 rc = osc_brw_fini_request(req, rc);
1645 ptlrpc_req_finished(req);
1646 if (osc_recoverable_error(rc)) {
1648 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1649 CERROR("too many resend retries, returning error\n");
1653 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1654 l_wait_event(waitq, 0, &lwi);
1662 int osc_brw_redo_request(struct ptlrpc_request *request,
1663 struct osc_brw_async_args *aa)
1665 struct ptlrpc_request *new_req;
1666 struct ptlrpc_request_set *set = request->rq_set;
1667 struct osc_brw_async_args *new_aa;
1668 struct osc_async_page *oap;
1672 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1673 CERROR("too many resend retries, returning error\n");
1677 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1679 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1680 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1681 aa->aa_cli, aa->aa_oa,
1682 NULL /* lsm unused by osc currently */,
1683 aa->aa_page_count, aa->aa_ppga,
1684 &new_req, aa->aa_ocapa, 0);
1688 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1690 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1691 if (oap->oap_request != NULL) {
1692 LASSERTF(request == oap->oap_request,
1693 "request %p != oap_request %p\n",
1694 request, oap->oap_request);
1695 if (oap->oap_interrupted) {
1696 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1697 ptlrpc_req_finished(new_req);
1702 /* New request takes over pga and oaps from old request.
1703 * Note that copying a list_head doesn't work, need to move it... */
1705 new_req->rq_interpret_reply = request->rq_interpret_reply;
1706 new_req->rq_async_args = request->rq_async_args;
1707 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1709 new_aa = ptlrpc_req_async_args(new_req);
1711 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1712 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1713 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1715 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1716 if (oap->oap_request) {
1717 ptlrpc_req_finished(oap->oap_request);
1718 oap->oap_request = ptlrpc_request_addref(new_req);
1722 new_aa->aa_ocapa = aa->aa_ocapa;
1723 aa->aa_ocapa = NULL;
1725 /* use ptlrpc_set_add_req is safe because interpret functions work
1726 * in check_set context. only one way exist with access to request
1727 * from different thread got -EINTR - this way protected with
1728 * cl_loi_list_lock */
1729 ptlrpc_set_add_req(set, new_req);
1731 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1733 DEBUG_REQ(D_INFO, new_req, "new request");
1738 * ugh, we want disk allocation on the target to happen in offset order. we'll
1739 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1740 * fine for our small page arrays and doesn't require allocation. its an
1741 * insertion sort that swaps elements that are strides apart, shrinking the
1742 * stride down until its '1' and the array is sorted.
1744 static void sort_brw_pages(struct brw_page **array, int num)
1747 struct brw_page *tmp;
1751 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1756 for (i = stride ; i < num ; i++) {
1759 while (j >= stride && array[j - stride]->off > tmp->off) {
1760 array[j] = array[j - stride];
1765 } while (stride > 1);
1768 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1774 LASSERT (pages > 0);
1775 offset = pg[i]->off & ~CFS_PAGE_MASK;
1779 if (pages == 0) /* that's all */
1782 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1783 return count; /* doesn't end on page boundary */
1786 offset = pg[i]->off & ~CFS_PAGE_MASK;
1787 if (offset != 0) /* doesn't start on page boundary */
1794 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1796 struct brw_page **ppga;
1799 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1803 for (i = 0; i < count; i++)
1808 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1810 LASSERT(ppga != NULL);
1811 OBD_FREE(ppga, sizeof(*ppga) * count);
1814 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1815 obd_count page_count, struct brw_page *pga,
1816 struct obd_trans_info *oti)
1818 struct obdo *saved_oa = NULL;
1819 struct brw_page **ppga, **orig;
1820 struct obd_import *imp = class_exp2cliimp(exp);
1821 struct client_obd *cli;
1822 int rc, page_count_orig;
1825 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1826 cli = &imp->imp_obd->u.cli;
1828 if (cmd & OBD_BRW_CHECK) {
1829 /* The caller just wants to know if there's a chance that this
1830 * I/O can succeed */
1832 if (imp->imp_invalid)
1837 /* test_brw with a failed create can trip this, maybe others. */
1838 LASSERT(cli->cl_max_pages_per_rpc);
1842 orig = ppga = osc_build_ppga(pga, page_count);
1845 page_count_orig = page_count;
1847 sort_brw_pages(ppga, page_count);
1848 while (page_count) {
1849 obd_count pages_per_brw;
1851 if (page_count > cli->cl_max_pages_per_rpc)
1852 pages_per_brw = cli->cl_max_pages_per_rpc;
1854 pages_per_brw = page_count;
1856 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1858 if (saved_oa != NULL) {
1859 /* restore previously saved oa */
1860 *oinfo->oi_oa = *saved_oa;
1861 } else if (page_count > pages_per_brw) {
1862 /* save a copy of oa (brw will clobber it) */
1863 OBDO_ALLOC(saved_oa);
1864 if (saved_oa == NULL)
1865 GOTO(out, rc = -ENOMEM);
1866 *saved_oa = *oinfo->oi_oa;
1869 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1870 pages_per_brw, ppga, oinfo->oi_capa);
1875 page_count -= pages_per_brw;
1876 ppga += pages_per_brw;
1880 osc_release_ppga(orig, page_count_orig);
1882 if (saved_oa != NULL)
1883 OBDO_FREE(saved_oa);
1888 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1889 * the dirty accounting. Writeback completes or truncate happens before
1890 * writing starts. Must be called with the loi lock held. */
1891 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1894 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1898 /* This maintains the lists of pending pages to read/write for a given object
1899 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1900 * to quickly find objects that are ready to send an RPC. */
1901 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1907 if (lop->lop_num_pending == 0)
1910 /* if we have an invalid import we want to drain the queued pages
1911 * by forcing them through rpcs that immediately fail and complete
1912 * the pages. recovery relies on this to empty the queued pages
1913 * before canceling the locks and evicting down the llite pages */
1914 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1917 /* stream rpcs in queue order as long as as there is an urgent page
1918 * queued. this is our cheap solution for good batching in the case
1919 * where writepage marks some random page in the middle of the file
1920 * as urgent because of, say, memory pressure */
1921 if (!list_empty(&lop->lop_urgent)) {
1922 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1925 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1926 optimal = cli->cl_max_pages_per_rpc;
1927 if (cmd & OBD_BRW_WRITE) {
1928 /* trigger a write rpc stream as long as there are dirtiers
1929 * waiting for space. as they're waiting, they're not going to
1930 * create more pages to coallesce with what's waiting.. */
1931 if (!list_empty(&cli->cl_cache_waiters)) {
1932 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1935 /* +16 to avoid triggering rpcs that would want to include pages
1936 * that are being queued but which can't be made ready until
1937 * the queuer finishes with the page. this is a wart for
1938 * llite::commit_write() */
1941 if (lop->lop_num_pending >= optimal)
1947 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1949 struct osc_async_page *oap;
1952 if (list_empty(&lop->lop_urgent))
1955 oap = list_entry(lop->lop_urgent.next,
1956 struct osc_async_page, oap_urgent_item);
1958 if (oap->oap_async_flags & ASYNC_HP) {
1959 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1966 static void on_list(struct list_head *item, struct list_head *list,
1969 if (list_empty(item) && should_be_on)
1970 list_add_tail(item, list);
1971 else if (!list_empty(item) && !should_be_on)
1972 list_del_init(item);
1975 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1976 * can find pages to build into rpcs quickly */
1977 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1979 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1980 lop_makes_hprpc(&loi->loi_read_lop)) {
1982 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1983 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1985 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1986 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1987 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1988 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1991 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1992 loi->loi_write_lop.lop_num_pending);
1994 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1995 loi->loi_read_lop.lop_num_pending);
1998 static void lop_update_pending(struct client_obd *cli,
1999 struct loi_oap_pages *lop, int cmd, int delta)
2001 lop->lop_num_pending += delta;
2002 if (cmd & OBD_BRW_WRITE)
2003 cli->cl_pending_w_pages += delta;
2005 cli->cl_pending_r_pages += delta;
2009 * this is called when a sync waiter receives an interruption. Its job is to
2010 * get the caller woken as soon as possible. If its page hasn't been put in an
2011 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2012 * desiring interruption which will forcefully complete the rpc once the rpc
2015 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2017 struct loi_oap_pages *lop;
2018 struct lov_oinfo *loi;
2022 LASSERT(!oap->oap_interrupted);
2023 oap->oap_interrupted = 1;
2025 /* ok, it's been put in an rpc. only one oap gets a request reference */
2026 if (oap->oap_request != NULL) {
2027 ptlrpc_mark_interrupted(oap->oap_request);
2028 ptlrpcd_wake(oap->oap_request);
2029 ptlrpc_req_finished(oap->oap_request);
2030 oap->oap_request = NULL;
2034 * page completion may be called only if ->cpo_prep() method was
2035 * executed by osc_io_submit(), that also adds page the to pending list
2037 if (!list_empty(&oap->oap_pending_item)) {
2038 list_del_init(&oap->oap_pending_item);
2039 list_del_init(&oap->oap_urgent_item);
2042 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2043 &loi->loi_write_lop : &loi->loi_read_lop;
2044 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2045 loi_list_maint(oap->oap_cli, oap->oap_loi);
2046 rc = oap->oap_caller_ops->ap_completion(env,
2047 oap->oap_caller_data,
2048 oap->oap_cmd, NULL, -EINTR);
2054 /* this is trying to propogate async writeback errors back up to the
2055 * application. As an async write fails we record the error code for later if
2056 * the app does an fsync. As long as errors persist we force future rpcs to be
2057 * sync so that the app can get a sync error and break the cycle of queueing
2058 * pages for which writeback will fail. */
2059 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2066 ar->ar_force_sync = 1;
2067 ar->ar_min_xid = ptlrpc_sample_next_xid();
2072 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2073 ar->ar_force_sync = 0;
2076 void osc_oap_to_pending(struct osc_async_page *oap)
2078 struct loi_oap_pages *lop;
2080 if (oap->oap_cmd & OBD_BRW_WRITE)
2081 lop = &oap->oap_loi->loi_write_lop;
2083 lop = &oap->oap_loi->loi_read_lop;
2085 if (oap->oap_async_flags & ASYNC_HP)
2086 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2087 else if (oap->oap_async_flags & ASYNC_URGENT)
2088 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2089 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2090 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2093 /* this must be called holding the loi list lock to give coverage to exit_cache,
2094 * async_flag maintenance, and oap_request */
2095 static void osc_ap_completion(const struct lu_env *env,
2096 struct client_obd *cli, struct obdo *oa,
2097 struct osc_async_page *oap, int sent, int rc)
2102 if (oap->oap_request != NULL) {
2103 xid = ptlrpc_req_xid(oap->oap_request);
2104 ptlrpc_req_finished(oap->oap_request);
2105 oap->oap_request = NULL;
2108 oap->oap_async_flags = 0;
2109 oap->oap_interrupted = 0;
2111 if (oap->oap_cmd & OBD_BRW_WRITE) {
2112 osc_process_ar(&cli->cl_ar, xid, rc);
2113 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2116 if (rc == 0 && oa != NULL) {
2117 if (oa->o_valid & OBD_MD_FLBLOCKS)
2118 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2119 if (oa->o_valid & OBD_MD_FLMTIME)
2120 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2121 if (oa->o_valid & OBD_MD_FLATIME)
2122 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2123 if (oa->o_valid & OBD_MD_FLCTIME)
2124 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2127 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2128 oap->oap_cmd, oa, rc);
2130 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2131 * I/O on the page could start, but OSC calls it under lock
2132 * and thus we can add oap back to pending safely */
2134 /* upper layer wants to leave the page on pending queue */
2135 osc_oap_to_pending(oap);
2137 osc_exit_cache(cli, oap, sent);
2141 static int brw_interpret(const struct lu_env *env,
2142 struct ptlrpc_request *req, void *data, int rc)
2144 struct osc_brw_async_args *aa = data;
2145 struct client_obd *cli;
2149 rc = osc_brw_fini_request(req, rc);
2150 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2151 if (osc_recoverable_error(rc)) {
2152 rc = osc_brw_redo_request(req, aa);
2158 capa_put(aa->aa_ocapa);
2159 aa->aa_ocapa = NULL;
2164 client_obd_list_lock(&cli->cl_loi_list_lock);
2166 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2167 * is called so we know whether to go to sync BRWs or wait for more
2168 * RPCs to complete */
2169 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2170 cli->cl_w_in_flight--;
2172 cli->cl_r_in_flight--;
2174 async = list_empty(&aa->aa_oaps);
2175 if (!async) { /* from osc_send_oap_rpc() */
2176 struct osc_async_page *oap, *tmp;
2177 /* the caller may re-use the oap after the completion call so
2178 * we need to clean it up a little */
2179 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2180 list_del_init(&oap->oap_rpc_item);
2181 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2183 OBDO_FREE(aa->aa_oa);
2184 } else { /* from async_internal() */
2186 for (i = 0; i < aa->aa_page_count; i++)
2187 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2189 osc_wake_cache_waiters(cli);
2190 osc_check_rpcs(env, cli);
2191 client_obd_list_unlock(&cli->cl_loi_list_lock);
2193 cl_req_completion(env, aa->aa_clerq, rc);
2194 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2198 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2199 struct client_obd *cli,
2200 struct list_head *rpc_list,
2201 int page_count, int cmd)
2203 struct ptlrpc_request *req;
2204 struct brw_page **pga = NULL;
2205 struct osc_brw_async_args *aa;
2206 struct obdo *oa = NULL;
2207 const struct obd_async_page_ops *ops = NULL;
2208 void *caller_data = NULL;
2209 struct osc_async_page *oap;
2210 struct osc_async_page *tmp;
2211 struct ost_body *body;
2212 struct cl_req *clerq = NULL;
2213 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2214 struct ldlm_lock *lock = NULL;
2215 struct cl_req_attr crattr;
2219 LASSERT(!list_empty(rpc_list));
2221 memset(&crattr, 0, sizeof crattr);
2222 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2224 GOTO(out, req = ERR_PTR(-ENOMEM));
2228 GOTO(out, req = ERR_PTR(-ENOMEM));
2231 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2232 struct cl_page *page = osc_oap2cl_page(oap);
2234 ops = oap->oap_caller_ops;
2235 caller_data = oap->oap_caller_data;
2237 clerq = cl_req_alloc(env, page, crt,
2238 1 /* only 1-object rpcs for
2241 GOTO(out, req = (void *)clerq);
2242 lock = oap->oap_ldlm_lock;
2244 pga[i] = &oap->oap_brw_page;
2245 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2246 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2247 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2249 cl_req_page_add(env, clerq, page);
2252 /* always get the data for the obdo for the rpc */
2253 LASSERT(ops != NULL);
2255 crattr.cra_capa = NULL;
2256 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2258 oa->o_handle = lock->l_remote_handle;
2259 oa->o_valid |= OBD_MD_FLHANDLE;
2262 rc = cl_req_prep(env, clerq);
2264 CERROR("cl_req_prep failed: %d\n", rc);
2265 GOTO(out, req = ERR_PTR(rc));
2268 sort_brw_pages(pga, page_count);
2269 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2270 pga, &req, crattr.cra_capa, 1);
2272 CERROR("prep_req failed: %d\n", rc);
2273 GOTO(out, req = ERR_PTR(rc));
2276 /* Need to update the timestamps after the request is built in case
2277 * we race with setattr (locally or in queue at OST). If OST gets
2278 * later setattr before earlier BRW (as determined by the request xid),
2279 * the OST will not use BRW timestamps. Sadly, there is no obvious
2280 * way to do this in a single call. bug 10150 */
2281 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2282 cl_req_attr_set(env, clerq, &crattr,
2283 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2285 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2286 aa = ptlrpc_req_async_args(req);
2287 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2288 list_splice(rpc_list, &aa->aa_oaps);
2289 CFS_INIT_LIST_HEAD(rpc_list);
2290 aa->aa_clerq = clerq;
2292 capa_put(crattr.cra_capa);
2297 OBD_FREE(pga, sizeof(*pga) * page_count);
2298 /* this should happen rarely and is pretty bad, it makes the
2299 * pending list not follow the dirty order */
2300 client_obd_list_lock(&cli->cl_loi_list_lock);
2301 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2302 list_del_init(&oap->oap_rpc_item);
2304 /* queued sync pages can be torn down while the pages
2305 * were between the pending list and the rpc */
2306 if (oap->oap_interrupted) {
2307 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2308 osc_ap_completion(env, cli, NULL, oap, 0,
2312 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2314 if (clerq && !IS_ERR(clerq))
2315 cl_req_completion(env, clerq, PTR_ERR(req));
2321 * prepare pages for ASYNC io and put pages in send queue.
2325 * \param cmd - OBD_BRW_* macroses
2326 * \param lop - pending pages
2328 * \return zero if pages successfully add to send queue.
2329 * \return not zere if error occurring.
2332 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2333 struct lov_oinfo *loi,
2334 int cmd, struct loi_oap_pages *lop)
2336 struct ptlrpc_request *req;
2337 obd_count page_count = 0;
2338 struct osc_async_page *oap = NULL, *tmp;
2339 struct osc_brw_async_args *aa;
2340 const struct obd_async_page_ops *ops;
2341 CFS_LIST_HEAD(rpc_list);
2342 unsigned int ending_offset;
2343 unsigned starting_offset = 0;
2345 struct cl_object *clob = NULL;
2348 /* If there are HP OAPs we need to handle at least 1 of them,
2349 * move it the beginning of the pending list for that. */
2350 if (!list_empty(&lop->lop_urgent)) {
2351 oap = list_entry(lop->lop_urgent.next,
2352 struct osc_async_page, oap_urgent_item);
2353 if (oap->oap_async_flags & ASYNC_HP)
2354 list_move(&oap->oap_pending_item, &lop->lop_pending);
2357 /* first we find the pages we're allowed to work with */
2358 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2360 ops = oap->oap_caller_ops;
2362 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2363 "magic 0x%x\n", oap, oap->oap_magic);
2366 /* pin object in memory, so that completion call-backs
2367 * can be safely called under client_obd_list lock. */
2368 clob = osc_oap2cl_page(oap)->cp_obj;
2369 cl_object_get(clob);
2372 if (page_count != 0 &&
2373 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2374 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2375 " oap %p, page %p, srvlock %u\n",
2376 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2379 /* in llite being 'ready' equates to the page being locked
2380 * until completion unlocks it. commit_write submits a page
2381 * as not ready because its unlock will happen unconditionally
2382 * as the call returns. if we race with commit_write giving
2383 * us that page we dont' want to create a hole in the page
2384 * stream, so we stop and leave the rpc to be fired by
2385 * another dirtier or kupdated interval (the not ready page
2386 * will still be on the dirty list). we could call in
2387 * at the end of ll_file_write to process the queue again. */
2388 if (!(oap->oap_async_flags & ASYNC_READY)) {
2389 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2392 CDEBUG(D_INODE, "oap %p page %p returned %d "
2393 "instead of ready\n", oap,
2397 /* llite is telling us that the page is still
2398 * in commit_write and that we should try
2399 * and put it in an rpc again later. we
2400 * break out of the loop so we don't create
2401 * a hole in the sequence of pages in the rpc
2406 /* the io isn't needed.. tell the checks
2407 * below to complete the rpc with EINTR */
2408 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2409 oap->oap_count = -EINTR;
2412 oap->oap_async_flags |= ASYNC_READY;
2415 LASSERTF(0, "oap %p page %p returned %d "
2416 "from make_ready\n", oap,
2424 * Page submitted for IO has to be locked. Either by
2425 * ->ap_make_ready() or by higher layers.
2427 #if defined(__KERNEL__) && defined(__linux__)
2429 struct cl_page *page;
2431 page = osc_oap2cl_page(oap);
2433 if (page->cp_type == CPT_CACHEABLE &&
2434 !(PageLocked(oap->oap_page) &&
2435 (CheckWriteback(oap->oap_page, cmd)))) {
2436 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2438 (long)oap->oap_page->flags,
2439 oap->oap_async_flags);
2444 /* If there is a gap at the start of this page, it can't merge
2445 * with any previous page, so we'll hand the network a
2446 * "fragmented" page array that it can't transfer in 1 RDMA */
2447 if (page_count != 0 && oap->oap_page_off != 0)
2450 /* take the page out of our book-keeping */
2451 list_del_init(&oap->oap_pending_item);
2452 lop_update_pending(cli, lop, cmd, -1);
2453 list_del_init(&oap->oap_urgent_item);
2455 if (page_count == 0)
2456 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2457 (PTLRPC_MAX_BRW_SIZE - 1);
2459 /* ask the caller for the size of the io as the rpc leaves. */
2460 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2462 ops->ap_refresh_count(env, oap->oap_caller_data,
2464 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2466 if (oap->oap_count <= 0) {
2467 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2469 osc_ap_completion(env, cli, NULL,
2470 oap, 0, oap->oap_count);
2474 /* now put the page back in our accounting */
2475 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2476 if (page_count == 0)
2477 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2478 if (++page_count >= cli->cl_max_pages_per_rpc)
2481 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2482 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2483 * have the same alignment as the initial writes that allocated
2484 * extents on the server. */
2485 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2486 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2487 if (ending_offset == 0)
2490 /* If there is a gap at the end of this page, it can't merge
2491 * with any subsequent pages, so we'll hand the network a
2492 * "fragmented" page array that it can't transfer in 1 RDMA */
2493 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2497 osc_wake_cache_waiters(cli);
2499 loi_list_maint(cli, loi);
2501 client_obd_list_unlock(&cli->cl_loi_list_lock);
2504 cl_object_put(env, clob);
2506 if (page_count == 0) {
2507 client_obd_list_lock(&cli->cl_loi_list_lock);
2511 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2513 LASSERT(list_empty(&rpc_list));
2514 loi_list_maint(cli, loi);
2515 RETURN(PTR_ERR(req));
2518 aa = ptlrpc_req_async_args(req);
2520 if (cmd == OBD_BRW_READ) {
2521 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2522 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2523 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2524 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2526 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2527 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2528 cli->cl_w_in_flight);
2529 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2530 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2532 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2534 client_obd_list_lock(&cli->cl_loi_list_lock);
2536 if (cmd == OBD_BRW_READ)
2537 cli->cl_r_in_flight++;
2539 cli->cl_w_in_flight++;
2541 /* queued sync pages can be torn down while the pages
2542 * were between the pending list and the rpc */
2544 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2545 /* only one oap gets a request reference */
2548 if (oap->oap_interrupted && !req->rq_intr) {
2549 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2551 ptlrpc_mark_interrupted(req);
2555 tmp->oap_request = ptlrpc_request_addref(req);
2557 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2558 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2560 req->rq_interpret_reply = brw_interpret;
2561 ptlrpcd_add_req(req, PSCOPE_BRW);
2565 #define LOI_DEBUG(LOI, STR, args...) \
2566 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2567 !list_empty(&(LOI)->loi_ready_item) || \
2568 !list_empty(&(LOI)->loi_hp_ready_item), \
2569 (LOI)->loi_write_lop.lop_num_pending, \
2570 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2571 (LOI)->loi_read_lop.lop_num_pending, \
2572 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2575 /* This is called by osc_check_rpcs() to find which objects have pages that
2576 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2577 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2581 /* First return objects that have blocked locks so that they
2582 * will be flushed quickly and other clients can get the lock,
2583 * then objects which have pages ready to be stuffed into RPCs */
2584 if (!list_empty(&cli->cl_loi_hp_ready_list))
2585 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2586 struct lov_oinfo, loi_hp_ready_item));
2587 if (!list_empty(&cli->cl_loi_ready_list))
2588 RETURN(list_entry(cli->cl_loi_ready_list.next,
2589 struct lov_oinfo, loi_ready_item));
2591 /* then if we have cache waiters, return all objects with queued
2592 * writes. This is especially important when many small files
2593 * have filled up the cache and not been fired into rpcs because
2594 * they don't pass the nr_pending/object threshhold */
2595 if (!list_empty(&cli->cl_cache_waiters) &&
2596 !list_empty(&cli->cl_loi_write_list))
2597 RETURN(list_entry(cli->cl_loi_write_list.next,
2598 struct lov_oinfo, loi_write_item));
2600 /* then return all queued objects when we have an invalid import
2601 * so that they get flushed */
2602 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2603 if (!list_empty(&cli->cl_loi_write_list))
2604 RETURN(list_entry(cli->cl_loi_write_list.next,
2605 struct lov_oinfo, loi_write_item));
2606 if (!list_empty(&cli->cl_loi_read_list))
2607 RETURN(list_entry(cli->cl_loi_read_list.next,
2608 struct lov_oinfo, loi_read_item));
2613 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2615 struct osc_async_page *oap;
2618 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2619 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2620 struct osc_async_page, oap_urgent_item);
2621 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2624 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2625 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2626 struct osc_async_page, oap_urgent_item);
2627 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2630 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2633 /* called with the loi list lock held */
2634 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2636 struct lov_oinfo *loi;
2637 int rc = 0, race_counter = 0;
2640 while ((loi = osc_next_loi(cli)) != NULL) {
2641 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2643 if (osc_max_rpc_in_flight(cli, loi))
2646 /* attempt some read/write balancing by alternating between
2647 * reads and writes in an object. The makes_rpc checks here
2648 * would be redundant if we were getting read/write work items
2649 * instead of objects. we don't want send_oap_rpc to drain a
2650 * partial read pending queue when we're given this object to
2651 * do io on writes while there are cache waiters */
2652 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2653 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2654 &loi->loi_write_lop);
2662 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2663 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2664 &loi->loi_read_lop);
2673 /* attempt some inter-object balancing by issueing rpcs
2674 * for each object in turn */
2675 if (!list_empty(&loi->loi_hp_ready_item))
2676 list_del_init(&loi->loi_hp_ready_item);
2677 if (!list_empty(&loi->loi_ready_item))
2678 list_del_init(&loi->loi_ready_item);
2679 if (!list_empty(&loi->loi_write_item))
2680 list_del_init(&loi->loi_write_item);
2681 if (!list_empty(&loi->loi_read_item))
2682 list_del_init(&loi->loi_read_item);
2684 loi_list_maint(cli, loi);
2686 /* send_oap_rpc fails with 0 when make_ready tells it to
2687 * back off. llite's make_ready does this when it tries
2688 * to lock a page queued for write that is already locked.
2689 * we want to try sending rpcs from many objects, but we
2690 * don't want to spin failing with 0. */
2691 if (race_counter == 10)
2697 /* we're trying to queue a page in the osc so we're subject to the
2698 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2699 * If the osc's queued pages are already at that limit, then we want to sleep
2700 * until there is space in the osc's queue for us. We also may be waiting for
2701 * write credits from the OST if there are RPCs in flight that may return some
2702 * before we fall back to sync writes.
2704 * We need this know our allocation was granted in the presence of signals */
2705 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2709 client_obd_list_lock(&cli->cl_loi_list_lock);
2710 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2711 client_obd_list_unlock(&cli->cl_loi_list_lock);
2716 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2719 int osc_enter_cache_try(const struct lu_env *env,
2720 struct client_obd *cli, struct lov_oinfo *loi,
2721 struct osc_async_page *oap, int transient)
2725 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2727 osc_consume_write_grant(cli, &oap->oap_brw_page);
2729 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2730 atomic_inc(&obd_dirty_transit_pages);
2731 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2737 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2738 * grant or cache space. */
2739 static int osc_enter_cache(const struct lu_env *env,
2740 struct client_obd *cli, struct lov_oinfo *loi,
2741 struct osc_async_page *oap)
2743 struct osc_cache_waiter ocw;
2744 struct l_wait_info lwi = { 0 };
2748 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2749 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2750 cli->cl_dirty_max, obd_max_dirty_pages,
2751 cli->cl_lost_grant, cli->cl_avail_grant);
2753 /* force the caller to try sync io. this can jump the list
2754 * of queued writes and create a discontiguous rpc stream */
2755 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2756 loi->loi_ar.ar_force_sync)
2759 /* Hopefully normal case - cache space and write credits available */
2760 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2761 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2762 osc_enter_cache_try(env, cli, loi, oap, 0))
2765 /* Make sure that there are write rpcs in flight to wait for. This
2766 * is a little silly as this object may not have any pending but
2767 * other objects sure might. */
2768 if (cli->cl_w_in_flight) {
2769 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2770 cfs_waitq_init(&ocw.ocw_waitq);
2774 loi_list_maint(cli, loi);
2775 osc_check_rpcs(env, cli);
2776 client_obd_list_unlock(&cli->cl_loi_list_lock);
2778 CDEBUG(D_CACHE, "sleeping for cache space\n");
2779 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2781 client_obd_list_lock(&cli->cl_loi_list_lock);
2782 if (!list_empty(&ocw.ocw_entry)) {
2783 list_del(&ocw.ocw_entry);
2793 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2794 struct lov_oinfo *loi, cfs_page_t *page,
2795 obd_off offset, const struct obd_async_page_ops *ops,
2796 void *data, void **res, int nocache,
2797 struct lustre_handle *lockh)
2799 struct osc_async_page *oap;
2804 return size_round(sizeof(*oap));
2807 oap->oap_magic = OAP_MAGIC;
2808 oap->oap_cli = &exp->exp_obd->u.cli;
2811 oap->oap_caller_ops = ops;
2812 oap->oap_caller_data = data;
2814 oap->oap_page = page;
2815 oap->oap_obj_off = offset;
2816 if (!client_is_remote(exp) &&
2817 cfs_capable(CFS_CAP_SYS_RESOURCE))
2818 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2820 LASSERT(!(offset & ~CFS_PAGE_MASK));
2822 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2823 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2824 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2825 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2827 spin_lock_init(&oap->oap_lock);
2828 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2832 struct osc_async_page *oap_from_cookie(void *cookie)
2834 struct osc_async_page *oap = cookie;
2835 if (oap->oap_magic != OAP_MAGIC)
2836 return ERR_PTR(-EINVAL);
2840 int osc_queue_async_io(const struct lu_env *env,
2841 struct obd_export *exp, struct lov_stripe_md *lsm,
2842 struct lov_oinfo *loi, void *cookie,
2843 int cmd, obd_off off, int count,
2844 obd_flag brw_flags, enum async_flags async_flags)
2846 struct client_obd *cli = &exp->exp_obd->u.cli;
2847 struct osc_async_page *oap;
2851 oap = oap_from_cookie(cookie);
2853 RETURN(PTR_ERR(oap));
2855 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2858 if (!list_empty(&oap->oap_pending_item) ||
2859 !list_empty(&oap->oap_urgent_item) ||
2860 !list_empty(&oap->oap_rpc_item))
2863 /* check if the file's owner/group is over quota */
2864 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2865 struct cl_object *obj;
2866 struct cl_attr attr; /* XXX put attr into thread info */
2867 unsigned int qid[MAXQUOTAS];
2869 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2871 cl_object_attr_lock(obj);
2872 rc = cl_object_attr_get(env, obj, &attr);
2873 cl_object_attr_unlock(obj);
2875 qid[USRQUOTA] = attr.cat_uid;
2876 qid[GRPQUOTA] = attr.cat_gid;
2878 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2885 loi = lsm->lsm_oinfo[0];
2887 client_obd_list_lock(&cli->cl_loi_list_lock);
2889 LASSERT(off + count <= CFS_PAGE_SIZE);
2891 oap->oap_page_off = off;
2892 oap->oap_count = count;
2893 oap->oap_brw_flags = brw_flags;
2894 oap->oap_async_flags = async_flags;
2896 if (cmd & OBD_BRW_WRITE) {
2897 rc = osc_enter_cache(env, cli, loi, oap);
2899 client_obd_list_unlock(&cli->cl_loi_list_lock);
2904 osc_oap_to_pending(oap);
2905 loi_list_maint(cli, loi);
2907 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2910 osc_check_rpcs(env, cli);
2911 client_obd_list_unlock(&cli->cl_loi_list_lock);
2916 /* aka (~was & now & flag), but this is more clear :) */
2917 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2919 int osc_set_async_flags_base(struct client_obd *cli,
2920 struct lov_oinfo *loi, struct osc_async_page *oap,
2921 obd_flag async_flags)
2923 struct loi_oap_pages *lop;
2926 LASSERT(!list_empty(&oap->oap_pending_item));
2928 if (oap->oap_cmd & OBD_BRW_WRITE) {
2929 lop = &loi->loi_write_lop;
2931 lop = &loi->loi_read_lop;
2934 if ((oap->oap_async_flags & async_flags) == async_flags)
2937 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2938 oap->oap_async_flags |= ASYNC_READY;
2940 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2941 list_empty(&oap->oap_rpc_item)) {
2942 if (oap->oap_async_flags & ASYNC_HP)
2943 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2945 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2946 oap->oap_async_flags |= ASYNC_URGENT;
2947 loi_list_maint(cli, loi);
2950 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2951 oap->oap_async_flags);
2955 int osc_teardown_async_page(struct obd_export *exp,
2956 struct lov_stripe_md *lsm,
2957 struct lov_oinfo *loi, void *cookie)
2959 struct client_obd *cli = &exp->exp_obd->u.cli;
2960 struct loi_oap_pages *lop;
2961 struct osc_async_page *oap;
2965 oap = oap_from_cookie(cookie);
2967 RETURN(PTR_ERR(oap));
2970 loi = lsm->lsm_oinfo[0];
2972 if (oap->oap_cmd & OBD_BRW_WRITE) {
2973 lop = &loi->loi_write_lop;
2975 lop = &loi->loi_read_lop;
2978 client_obd_list_lock(&cli->cl_loi_list_lock);
2980 if (!list_empty(&oap->oap_rpc_item))
2981 GOTO(out, rc = -EBUSY);
2983 osc_exit_cache(cli, oap, 0);
2984 osc_wake_cache_waiters(cli);
2986 if (!list_empty(&oap->oap_urgent_item)) {
2987 list_del_init(&oap->oap_urgent_item);
2988 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2990 if (!list_empty(&oap->oap_pending_item)) {
2991 list_del_init(&oap->oap_pending_item);
2992 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2994 loi_list_maint(cli, loi);
2995 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2997 client_obd_list_unlock(&cli->cl_loi_list_lock);
3001 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3002 struct ldlm_enqueue_info *einfo,
3005 void *data = einfo->ei_cbdata;
3007 LASSERT(lock != NULL);
3008 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3009 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3010 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3011 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3013 lock_res_and_lock(lock);
3014 spin_lock(&osc_ast_guard);
3015 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3016 lock->l_ast_data = data;
3017 spin_unlock(&osc_ast_guard);
3018 unlock_res_and_lock(lock);
3021 static void osc_set_data_with_check(struct lustre_handle *lockh,
3022 struct ldlm_enqueue_info *einfo,
3025 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3028 osc_set_lock_data_with_check(lock, einfo, flags);
3029 LDLM_LOCK_PUT(lock);
3031 CERROR("lockh %p, data %p - client evicted?\n",
3032 lockh, einfo->ei_cbdata);
3035 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3036 ldlm_iterator_t replace, void *data)
3038 struct ldlm_res_id res_id;
3039 struct obd_device *obd = class_exp2obd(exp);
3041 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3042 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3046 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3047 obd_enqueue_update_f upcall, void *cookie,
3050 int intent = *flags & LDLM_FL_HAS_INTENT;
3054 /* The request was created before ldlm_cli_enqueue call. */
3055 if (rc == ELDLM_LOCK_ABORTED) {
3056 struct ldlm_reply *rep;
3057 rep = req_capsule_server_get(&req->rq_pill,
3060 LASSERT(rep != NULL);
3061 if (rep->lock_policy_res1)
3062 rc = rep->lock_policy_res1;
3066 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3067 *flags |= LDLM_FL_LVB_READY;
3068 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3069 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3072 /* Call the update callback. */
3073 rc = (*upcall)(cookie, rc);
3077 static int osc_enqueue_interpret(const struct lu_env *env,
3078 struct ptlrpc_request *req,
3079 struct osc_enqueue_args *aa, int rc)
3081 struct ldlm_lock *lock;
3082 struct lustre_handle handle;
3085 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3086 * might be freed anytime after lock upcall has been called. */
3087 lustre_handle_copy(&handle, aa->oa_lockh);
3088 mode = aa->oa_ei->ei_mode;
3090 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3092 lock = ldlm_handle2lock(&handle);
3094 /* Take an additional reference so that a blocking AST that
3095 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3096 * to arrive after an upcall has been executed by
3097 * osc_enqueue_fini(). */
3098 ldlm_lock_addref(&handle, mode);
3100 /* Complete obtaining the lock procedure. */
3101 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3102 mode, aa->oa_flags, aa->oa_lvb,
3103 sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3105 /* Complete osc stuff. */
3106 rc = osc_enqueue_fini(req, aa->oa_lvb,
3107 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3109 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3111 /* Release the lock for async request. */
3112 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3114 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3115 * not already released by
3116 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3118 ldlm_lock_decref(&handle, mode);
3120 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3121 aa->oa_lockh, req, aa);
3122 ldlm_lock_decref(&handle, mode);
3123 LDLM_LOCK_PUT(lock);
3127 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3128 struct lov_oinfo *loi, int flags,
3129 struct ost_lvb *lvb, __u32 mode, int rc)
3131 if (rc == ELDLM_OK) {
3132 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3135 LASSERT(lock != NULL);
3136 loi->loi_lvb = *lvb;
3137 tmp = loi->loi_lvb.lvb_size;
3138 /* Extend KMS up to the end of this lock and no further
3139 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3140 if (tmp > lock->l_policy_data.l_extent.end)
3141 tmp = lock->l_policy_data.l_extent.end + 1;
3142 if (tmp >= loi->loi_kms) {
3143 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3144 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3145 loi_kms_set(loi, tmp);
3147 LDLM_DEBUG(lock, "lock acquired, setting rss="
3148 LPU64"; leaving kms="LPU64", end="LPU64,
3149 loi->loi_lvb.lvb_size, loi->loi_kms,
3150 lock->l_policy_data.l_extent.end);
3152 ldlm_lock_allow_match(lock);
3153 LDLM_LOCK_PUT(lock);
3154 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3155 loi->loi_lvb = *lvb;
3156 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3157 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3161 EXPORT_SYMBOL(osc_update_enqueue);
3163 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3165 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3166 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3167 * other synchronous requests, however keeping some locks and trying to obtain
3168 * others may take a considerable amount of time in a case of ost failure; and
3169 * when other sync requests do not get released lock from a client, the client
3170 * is excluded from the cluster -- such scenarious make the life difficult, so
3171 * release locks just after they are obtained. */
3172 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3173 int *flags, ldlm_policy_data_t *policy,
3174 struct ost_lvb *lvb, int kms_valid,
3175 obd_enqueue_update_f upcall, void *cookie,
3176 struct ldlm_enqueue_info *einfo,
3177 struct lustre_handle *lockh,
3178 struct ptlrpc_request_set *rqset, int async)
3180 struct obd_device *obd = exp->exp_obd;
3181 struct ptlrpc_request *req = NULL;
3182 int intent = *flags & LDLM_FL_HAS_INTENT;
3187 /* Filesystem lock extents are extended to page boundaries so that
3188 * dealing with the page cache is a little smoother. */
3189 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3190 policy->l_extent.end |= ~CFS_PAGE_MASK;
3193 * kms is not valid when either object is completely fresh (so that no
3194 * locks are cached), or object was evicted. In the latter case cached
3195 * lock cannot be used, because it would prime inode state with
3196 * potentially stale LVB.
3201 /* Next, search for already existing extent locks that will cover us */
3202 /* If we're trying to read, we also search for an existing PW lock. The
3203 * VFS and page cache already protect us locally, so lots of readers/
3204 * writers can share a single PW lock.
3206 * There are problems with conversion deadlocks, so instead of
3207 * converting a read lock to a write lock, we'll just enqueue a new
3210 * At some point we should cancel the read lock instead of making them
3211 * send us a blocking callback, but there are problems with canceling
3212 * locks out from other users right now, too. */
3213 mode = einfo->ei_mode;
3214 if (einfo->ei_mode == LCK_PR)
3216 mode = ldlm_lock_match(obd->obd_namespace,
3217 *flags | LDLM_FL_LVB_READY, res_id,
3218 einfo->ei_type, policy, mode, lockh, 0);
3220 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3222 if (matched->l_ast_data == NULL ||
3223 matched->l_ast_data == einfo->ei_cbdata) {
3224 /* addref the lock only if not async requests and PW
3225 * lock is matched whereas we asked for PR. */
3226 if (!rqset && einfo->ei_mode != mode)
3227 ldlm_lock_addref(lockh, LCK_PR);
3228 osc_set_lock_data_with_check(matched, einfo, *flags);
3230 /* I would like to be able to ASSERT here that
3231 * rss <= kms, but I can't, for reasons which
3232 * are explained in lov_enqueue() */
3235 /* We already have a lock, and it's referenced */
3236 (*upcall)(cookie, ELDLM_OK);
3238 /* For async requests, decref the lock. */
3239 if (einfo->ei_mode != mode)
3240 ldlm_lock_decref(lockh, LCK_PW);
3242 ldlm_lock_decref(lockh, einfo->ei_mode);
3243 LDLM_LOCK_PUT(matched);
3246 ldlm_lock_decref(lockh, mode);
3247 LDLM_LOCK_PUT(matched);
3252 CFS_LIST_HEAD(cancels);
3253 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3254 &RQF_LDLM_ENQUEUE_LVB);
3258 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3262 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3264 ptlrpc_request_set_replen(req);
3267 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3268 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3270 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3271 sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3274 struct osc_enqueue_args *aa;
3275 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3276 aa = ptlrpc_req_async_args(req);
3279 aa->oa_flags = flags;
3280 aa->oa_upcall = upcall;
3281 aa->oa_cookie = cookie;
3283 aa->oa_lockh = lockh;
3285 req->rq_interpret_reply =
3286 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3287 if (rqset == PTLRPCD_SET)
3288 ptlrpcd_add_req(req, PSCOPE_OTHER);
3290 ptlrpc_set_add_req(rqset, req);
3291 } else if (intent) {
3292 ptlrpc_req_finished(req);
3297 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3299 ptlrpc_req_finished(req);
3304 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3305 struct ldlm_enqueue_info *einfo,
3306 struct ptlrpc_request_set *rqset)
3308 struct ldlm_res_id res_id;
3312 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3313 oinfo->oi_md->lsm_object_gr, &res_id);
3315 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3316 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3317 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3318 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3319 rqset, rqset != NULL);
3323 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3324 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3325 int *flags, void *data, struct lustre_handle *lockh,
3328 struct obd_device *obd = exp->exp_obd;
3329 int lflags = *flags;
3333 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3336 /* Filesystem lock extents are extended to page boundaries so that
3337 * dealing with the page cache is a little smoother */
3338 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3339 policy->l_extent.end |= ~CFS_PAGE_MASK;
3341 /* Next, search for already existing extent locks that will cover us */
3342 /* If we're trying to read, we also search for an existing PW lock. The
3343 * VFS and page cache already protect us locally, so lots of readers/
3344 * writers can share a single PW lock. */
3348 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3349 res_id, type, policy, rc, lockh, unref);
3352 osc_set_data_with_check(lockh, data, lflags);
3353 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3354 ldlm_lock_addref(lockh, LCK_PR);
3355 ldlm_lock_decref(lockh, LCK_PW);
3362 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3366 if (unlikely(mode == LCK_GROUP))
3367 ldlm_lock_decref_and_cancel(lockh, mode);
3369 ldlm_lock_decref(lockh, mode);
3374 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3375 __u32 mode, struct lustre_handle *lockh)
3378 RETURN(osc_cancel_base(lockh, mode));
3381 static int osc_cancel_unused(struct obd_export *exp,
3382 struct lov_stripe_md *lsm, int flags,
3385 struct obd_device *obd = class_exp2obd(exp);
3386 struct ldlm_res_id res_id, *resp = NULL;
3389 resp = osc_build_res_name(lsm->lsm_object_id,
3390 lsm->lsm_object_gr, &res_id);
3393 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3396 static int osc_statfs_interpret(const struct lu_env *env,
3397 struct ptlrpc_request *req,
3398 struct osc_async_args *aa, int rc)
3400 struct obd_statfs *msfs;
3403 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3404 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3410 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3412 GOTO(out, rc = -EPROTO);
3415 *aa->aa_oi->oi_osfs = *msfs;
3417 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3421 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3422 __u64 max_age, struct ptlrpc_request_set *rqset)
3424 struct ptlrpc_request *req;
3425 struct osc_async_args *aa;
3429 /* We could possibly pass max_age in the request (as an absolute
3430 * timestamp or a "seconds.usec ago") so the target can avoid doing
3431 * extra calls into the filesystem if that isn't necessary (e.g.
3432 * during mount that would help a bit). Having relative timestamps
3433 * is not so great if request processing is slow, while absolute
3434 * timestamps are not ideal because they need time synchronization. */
3435 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3439 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3441 ptlrpc_request_free(req);
3444 ptlrpc_request_set_replen(req);
3445 req->rq_request_portal = OST_CREATE_PORTAL;
3446 ptlrpc_at_set_req_timeout(req);
3448 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3449 /* procfs requests not want stat in wait for avoid deadlock */
3450 req->rq_no_resend = 1;
3451 req->rq_no_delay = 1;
3454 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3455 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3456 aa = ptlrpc_req_async_args(req);
3459 ptlrpc_set_add_req(rqset, req);
3463 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3464 __u64 max_age, __u32 flags)
3466 struct obd_statfs *msfs;
3467 struct ptlrpc_request *req;
3468 struct obd_import *imp = NULL;
3472 /*Since the request might also come from lprocfs, so we need
3473 *sync this with client_disconnect_export Bug15684*/
3474 down_read(&obd->u.cli.cl_sem);
3475 if (obd->u.cli.cl_import)
3476 imp = class_import_get(obd->u.cli.cl_import);
3477 up_read(&obd->u.cli.cl_sem);
3481 /* We could possibly pass max_age in the request (as an absolute
3482 * timestamp or a "seconds.usec ago") so the target can avoid doing
3483 * extra calls into the filesystem if that isn't necessary (e.g.
3484 * during mount that would help a bit). Having relative timestamps
3485 * is not so great if request processing is slow, while absolute
3486 * timestamps are not ideal because they need time synchronization. */
3487 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3489 class_import_put(imp);
3494 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3496 ptlrpc_request_free(req);
3499 ptlrpc_request_set_replen(req);
3500 req->rq_request_portal = OST_CREATE_PORTAL;
3501 ptlrpc_at_set_req_timeout(req);
3503 if (flags & OBD_STATFS_NODELAY) {
3504 /* procfs requests not want stat in wait for avoid deadlock */
3505 req->rq_no_resend = 1;
3506 req->rq_no_delay = 1;
3509 rc = ptlrpc_queue_wait(req);
3513 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3515 GOTO(out, rc = -EPROTO);
3522 ptlrpc_req_finished(req);
3526 /* Retrieve object striping information.
3528 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3529 * the maximum number of OST indices which will fit in the user buffer.
3530 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3532 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3534 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3535 struct lov_user_md_v3 lum, *lumk;
3536 struct lov_user_ost_data_v1 *lmm_objects;
3537 int rc = 0, lum_size;
3543 /* we only need the header part from user space to get lmm_magic and
3544 * lmm_stripe_count, (the header part is common to v1 and v3) */
3545 lum_size = sizeof(struct lov_user_md_v1);
3546 if (copy_from_user(&lum, lump, lum_size))
3549 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3550 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3553 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3554 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3555 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3556 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3558 /* we can use lov_mds_md_size() to compute lum_size
3559 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3560 if (lum.lmm_stripe_count > 0) {
3561 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3562 OBD_ALLOC(lumk, lum_size);
3566 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3567 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3569 lmm_objects = &(lumk->lmm_objects[0]);
3570 lmm_objects->l_object_id = lsm->lsm_object_id;
3572 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3576 lumk->lmm_object_id = lsm->lsm_object_id;
3577 lumk->lmm_object_gr = lsm->lsm_object_gr;
3578 lumk->lmm_stripe_count = 1;
3580 if (copy_to_user(lump, lumk, lum_size))
3584 OBD_FREE(lumk, lum_size);
3590 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3591 void *karg, void *uarg)
3593 struct obd_device *obd = exp->exp_obd;
3594 struct obd_ioctl_data *data = karg;
3598 if (!try_module_get(THIS_MODULE)) {
3599 CERROR("Can't get module. Is it alive?");
3603 case OBD_IOC_LOV_GET_CONFIG: {
3605 struct lov_desc *desc;
3606 struct obd_uuid uuid;
3610 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3611 GOTO(out, err = -EINVAL);
3613 data = (struct obd_ioctl_data *)buf;
3615 if (sizeof(*desc) > data->ioc_inllen1) {
3616 obd_ioctl_freedata(buf, len);
3617 GOTO(out, err = -EINVAL);
3620 if (data->ioc_inllen2 < sizeof(uuid)) {
3621 obd_ioctl_freedata(buf, len);
3622 GOTO(out, err = -EINVAL);
3625 desc = (struct lov_desc *)data->ioc_inlbuf1;
3626 desc->ld_tgt_count = 1;
3627 desc->ld_active_tgt_count = 1;
3628 desc->ld_default_stripe_count = 1;
3629 desc->ld_default_stripe_size = 0;
3630 desc->ld_default_stripe_offset = 0;
3631 desc->ld_pattern = 0;
3632 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3634 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3636 err = copy_to_user((void *)uarg, buf, len);
3639 obd_ioctl_freedata(buf, len);
3642 case LL_IOC_LOV_SETSTRIPE:
3643 err = obd_alloc_memmd(exp, karg);
3647 case LL_IOC_LOV_GETSTRIPE:
3648 err = osc_getstripe(karg, uarg);
3650 case OBD_IOC_CLIENT_RECOVER:
3651 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3656 case IOC_OSC_SET_ACTIVE:
3657 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3660 case OBD_IOC_POLL_QUOTACHECK:
3661 err = lquota_poll_check(quota_interface, exp,
3662 (struct if_quotacheck *)karg);
3664 case OBD_IOC_PING_TARGET:
3665 err = ptlrpc_obd_ping(obd);
3668 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3669 cmd, cfs_curproc_comm());
3670 GOTO(out, err = -ENOTTY);
3673 module_put(THIS_MODULE);
3677 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3678 void *key, __u32 *vallen, void *val,
3679 struct lov_stripe_md *lsm)
3682 if (!vallen || !val)
3685 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3686 __u32 *stripe = val;
3687 *vallen = sizeof(*stripe);
3690 } else if (KEY_IS(KEY_LAST_ID)) {
3691 struct ptlrpc_request *req;
3696 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3697 &RQF_OST_GET_INFO_LAST_ID);
3701 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3702 RCL_CLIENT, keylen);
3703 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3705 ptlrpc_request_free(req);
3709 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3710 memcpy(tmp, key, keylen);
3712 req->rq_no_delay = req->rq_no_resend = 1;
3713 ptlrpc_request_set_replen(req);
3714 rc = ptlrpc_queue_wait(req);
3718 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3720 GOTO(out, rc = -EPROTO);
3722 *((obd_id *)val) = *reply;
3724 ptlrpc_req_finished(req);
3726 } else if (KEY_IS(KEY_FIEMAP)) {
3727 struct ptlrpc_request *req;
3728 struct ll_user_fiemap *reply;
3732 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3733 &RQF_OST_GET_INFO_FIEMAP);
3737 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3738 RCL_CLIENT, keylen);
3739 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3740 RCL_CLIENT, *vallen);
3741 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3742 RCL_SERVER, *vallen);
3744 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3746 ptlrpc_request_free(req);
3750 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3751 memcpy(tmp, key, keylen);
3752 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3753 memcpy(tmp, val, *vallen);
3755 ptlrpc_request_set_replen(req);
3756 rc = ptlrpc_queue_wait(req);
3760 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3762 GOTO(out1, rc = -EPROTO);
3764 memcpy(val, reply, *vallen);
3766 ptlrpc_req_finished(req);
3774 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3776 struct llog_ctxt *ctxt;
3780 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3782 rc = llog_initiator_connect(ctxt);
3783 llog_ctxt_put(ctxt);
3785 /* XXX return an error? skip setting below flags? */
3788 spin_lock(&imp->imp_lock);
3789 imp->imp_server_timeout = 1;
3790 imp->imp_pingable = 1;
3791 spin_unlock(&imp->imp_lock);
3792 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3797 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3798 struct ptlrpc_request *req,
3805 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3808 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3809 void *key, obd_count vallen, void *val,
3810 struct ptlrpc_request_set *set)
3812 struct ptlrpc_request *req;
3813 struct obd_device *obd = exp->exp_obd;
3814 struct obd_import *imp = class_exp2cliimp(exp);
3819 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3821 if (KEY_IS(KEY_NEXT_ID)) {
3822 if (vallen != sizeof(obd_id))
3826 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3827 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3828 exp->exp_obd->obd_name,
3829 obd->u.cli.cl_oscc.oscc_next_id);
3834 if (KEY_IS(KEY_UNLINKED)) {
3835 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3836 spin_lock(&oscc->oscc_lock);
3837 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3838 spin_unlock(&oscc->oscc_lock);
3842 if (KEY_IS(KEY_INIT_RECOV)) {
3843 if (vallen != sizeof(int))
3845 spin_lock(&imp->imp_lock);
3846 imp->imp_initial_recov = *(int *)val;
3847 spin_unlock(&imp->imp_lock);
3848 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3849 exp->exp_obd->obd_name,
3850 imp->imp_initial_recov);
3854 if (KEY_IS(KEY_CHECKSUM)) {
3855 if (vallen != sizeof(int))
3857 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3861 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3862 sptlrpc_conf_client_adapt(obd);
3866 if (KEY_IS(KEY_FLUSH_CTX)) {
3867 sptlrpc_import_flush_my_ctx(imp);
3871 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3874 /* We pass all other commands directly to OST. Since nobody calls osc
3875 methods directly and everybody is supposed to go through LOV, we
3876 assume lov checked invalid values for us.
3877 The only recognised values so far are evict_by_nid and mds_conn.
3878 Even if something bad goes through, we'd get a -EINVAL from OST
3881 if (KEY_IS(KEY_GRANT_SHRINK))
3882 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3884 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3889 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3890 RCL_CLIENT, keylen);
3891 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3892 RCL_CLIENT, vallen);
3893 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3895 ptlrpc_request_free(req);
3899 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3900 memcpy(tmp, key, keylen);
3901 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3902 memcpy(tmp, val, vallen);
3904 if (KEY_IS(KEY_MDS_CONN)) {
3905 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3907 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3908 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3909 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3910 req->rq_no_delay = req->rq_no_resend = 1;
3911 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3912 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3913 struct osc_grant_args *aa;
3916 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3917 aa = ptlrpc_req_async_args(req);
3920 ptlrpc_req_finished(req);
3923 *oa = ((struct ost_body *)val)->oa;
3925 req->rq_interpret_reply = osc_shrink_grant_interpret;
3928 ptlrpc_request_set_replen(req);
3929 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3930 LASSERT(set != NULL);
3931 ptlrpc_set_add_req(set, req);
3932 ptlrpc_check_set(NULL, set);
3934 ptlrpcd_add_req(req, PSCOPE_OTHER);
3940 static struct llog_operations osc_size_repl_logops = {
3941 lop_cancel: llog_obd_repl_cancel
3944 static struct llog_operations osc_mds_ost_orig_logops;
3945 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3946 struct obd_device *tgt, int count,
3947 struct llog_catid *catid, struct obd_uuid *uuid)
3952 LASSERT(olg == &obd->obd_olg);
3953 spin_lock(&obd->obd_dev_lock);
3954 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3955 osc_mds_ost_orig_logops = llog_lvfs_ops;
3956 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3957 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3958 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3959 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3961 spin_unlock(&obd->obd_dev_lock);
3963 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3964 &catid->lci_logid, &osc_mds_ost_orig_logops);
3966 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3970 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3971 NULL, &osc_size_repl_logops);
3973 struct llog_ctxt *ctxt =
3974 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3977 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3982 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3983 obd->obd_name, tgt->obd_name, count, catid, rc);
3984 CERROR("logid "LPX64":0x%x\n",
3985 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3990 static int osc_llog_finish(struct obd_device *obd, int count)
3992 struct llog_ctxt *ctxt;
3993 int rc = 0, rc2 = 0;
3996 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3998 rc = llog_cleanup(ctxt);
4000 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4002 rc2 = llog_cleanup(ctxt);
4009 static int osc_reconnect(const struct lu_env *env,
4010 struct obd_export *exp, struct obd_device *obd,
4011 struct obd_uuid *cluuid,
4012 struct obd_connect_data *data,
4015 struct client_obd *cli = &obd->u.cli;
4017 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4020 client_obd_list_lock(&cli->cl_loi_list_lock);
4021 data->ocd_grant = cli->cl_avail_grant ?:
4022 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4023 lost_grant = cli->cl_lost_grant;
4024 cli->cl_lost_grant = 0;
4025 client_obd_list_unlock(&cli->cl_loi_list_lock);
4027 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4028 "cl_lost_grant: %ld\n", data->ocd_grant,
4029 cli->cl_avail_grant, lost_grant);
4030 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4031 " ocd_grant: %d\n", data->ocd_connect_flags,
4032 data->ocd_version, data->ocd_grant);
4038 static int osc_disconnect(struct obd_export *exp)
4040 struct obd_device *obd = class_exp2obd(exp);
4041 struct llog_ctxt *ctxt;
4044 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4046 if (obd->u.cli.cl_conn_count == 1) {
4047 /* Flush any remaining cancel messages out to the
4049 llog_sync(ctxt, exp);
4051 llog_ctxt_put(ctxt);
4053 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4057 rc = client_disconnect_export(exp);
4059 * Initially we put del_shrink_grant before disconnect_export, but it
4060 * causes the following problem if setup (connect) and cleanup
4061 * (disconnect) are tangled together.
4062 * connect p1 disconnect p2
4063 * ptlrpc_connect_import
4064 * ............... class_manual_cleanup
4067 * ptlrpc_connect_interrupt
4069 * add this client to shrink list
4071 * Bang! pinger trigger the shrink.
4072 * So the osc should be disconnected from the shrink list, after we
4073 * are sure the import has been destroyed. BUG18662
4075 if (obd->u.cli.cl_import == NULL)
4076 osc_del_shrink_grant(&obd->u.cli);
4080 static int osc_import_event(struct obd_device *obd,
4081 struct obd_import *imp,
4082 enum obd_import_event event)
4084 struct client_obd *cli;
4088 LASSERT(imp->imp_obd == obd);
4091 case IMP_EVENT_DISCON: {
4092 /* Only do this on the MDS OSC's */
4093 if (imp->imp_server_timeout) {
4094 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4096 spin_lock(&oscc->oscc_lock);
4097 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4098 spin_unlock(&oscc->oscc_lock);
4101 client_obd_list_lock(&cli->cl_loi_list_lock);
4102 cli->cl_avail_grant = 0;
4103 cli->cl_lost_grant = 0;
4104 client_obd_list_unlock(&cli->cl_loi_list_lock);
4107 case IMP_EVENT_INACTIVE: {
4108 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4111 case IMP_EVENT_INVALIDATE: {
4112 struct ldlm_namespace *ns = obd->obd_namespace;
4116 env = cl_env_get(&refcheck);
4120 client_obd_list_lock(&cli->cl_loi_list_lock);
4121 /* all pages go to failing rpcs due to the invalid
4123 osc_check_rpcs(env, cli);
4124 client_obd_list_unlock(&cli->cl_loi_list_lock);
4126 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4127 cl_env_put(env, &refcheck);
4132 case IMP_EVENT_ACTIVE: {
4133 /* Only do this on the MDS OSC's */
4134 if (imp->imp_server_timeout) {
4135 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4137 spin_lock(&oscc->oscc_lock);
4138 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4139 spin_unlock(&oscc->oscc_lock);
4141 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4144 case IMP_EVENT_OCD: {
4145 struct obd_connect_data *ocd = &imp->imp_connect_data;
4147 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4148 osc_init_grant(&obd->u.cli, ocd);
4151 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4152 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4154 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4158 CERROR("Unknown import event %d\n", event);
4164 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4170 rc = ptlrpcd_addref();
4174 rc = client_obd_setup(obd, lcfg);
4178 struct lprocfs_static_vars lvars = { 0 };
4179 struct client_obd *cli = &obd->u.cli;
4181 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4182 lprocfs_osc_init_vars(&lvars);
4183 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4184 lproc_osc_attach_seqstat(obd);
4185 sptlrpc_lprocfs_cliobd_attach(obd);
4186 ptlrpc_lprocfs_register_obd(obd);
4190 /* We need to allocate a few requests more, because
4191 brw_interpret tries to create new requests before freeing
4192 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4193 reserved, but I afraid that might be too much wasted RAM
4194 in fact, so 2 is just my guess and still should work. */
4195 cli->cl_import->imp_rq_pool =
4196 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4198 ptlrpc_add_rqs_to_pool);
4200 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4201 sema_init(&cli->cl_grant_sem, 1);
4207 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4213 case OBD_CLEANUP_EARLY: {
4214 struct obd_import *imp;
4215 imp = obd->u.cli.cl_import;
4216 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4217 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4218 ptlrpc_deactivate_import(imp);
4219 spin_lock(&imp->imp_lock);
4220 imp->imp_pingable = 0;
4221 spin_unlock(&imp->imp_lock);
4224 case OBD_CLEANUP_EXPORTS: {
4225 /* If we set up but never connected, the
4226 client import will not have been cleaned. */
4227 if (obd->u.cli.cl_import) {
4228 struct obd_import *imp;
4229 down_write(&obd->u.cli.cl_sem);
4230 imp = obd->u.cli.cl_import;
4231 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4233 ptlrpc_invalidate_import(imp);
4234 if (imp->imp_rq_pool) {
4235 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4236 imp->imp_rq_pool = NULL;
4238 class_destroy_import(imp);
4239 up_write(&obd->u.cli.cl_sem);
4240 obd->u.cli.cl_import = NULL;
4242 rc = obd_llog_finish(obd, 0);
4244 CERROR("failed to cleanup llogging subsystems\n");
4251 int osc_cleanup(struct obd_device *obd)
4253 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4257 ptlrpc_lprocfs_unregister_obd(obd);
4258 lprocfs_obd_cleanup(obd);
4260 spin_lock(&oscc->oscc_lock);
4261 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4262 oscc->oscc_flags |= OSCC_FLAG_EXITING;
4263 spin_unlock(&oscc->oscc_lock);
4265 /* free memory of osc quota cache */
4266 lquota_cleanup(quota_interface, obd);
4268 rc = client_obd_cleanup(obd);
4274 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4276 struct lprocfs_static_vars lvars = { 0 };
4279 lprocfs_osc_init_vars(&lvars);
4281 switch (lcfg->lcfg_command) {
4283 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4293 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4295 return osc_process_config_base(obd, buf);
4298 struct obd_ops osc_obd_ops = {
4299 .o_owner = THIS_MODULE,
4300 .o_setup = osc_setup,
4301 .o_precleanup = osc_precleanup,
4302 .o_cleanup = osc_cleanup,
4303 .o_add_conn = client_import_add_conn,
4304 .o_del_conn = client_import_del_conn,
4305 .o_connect = client_connect_import,
4306 .o_reconnect = osc_reconnect,
4307 .o_disconnect = osc_disconnect,
4308 .o_statfs = osc_statfs,
4309 .o_statfs_async = osc_statfs_async,
4310 .o_packmd = osc_packmd,
4311 .o_unpackmd = osc_unpackmd,
4312 .o_precreate = osc_precreate,
4313 .o_create = osc_create,
4314 .o_destroy = osc_destroy,
4315 .o_getattr = osc_getattr,
4316 .o_getattr_async = osc_getattr_async,
4317 .o_setattr = osc_setattr,
4318 .o_setattr_async = osc_setattr_async,
4320 .o_punch = osc_punch,
4322 .o_enqueue = osc_enqueue,
4323 .o_change_cbdata = osc_change_cbdata,
4324 .o_cancel = osc_cancel,
4325 .o_cancel_unused = osc_cancel_unused,
4326 .o_iocontrol = osc_iocontrol,
4327 .o_get_info = osc_get_info,
4328 .o_set_info_async = osc_set_info_async,
4329 .o_import_event = osc_import_event,
4330 .o_llog_init = osc_llog_init,
4331 .o_llog_finish = osc_llog_finish,
4332 .o_process_config = osc_process_config,
4335 extern struct lu_kmem_descr osc_caches[];
4336 extern spinlock_t osc_ast_guard;
4337 extern struct lock_class_key osc_ast_guard_class;
4339 int __init osc_init(void)
4341 struct lprocfs_static_vars lvars = { 0 };
4345 /* print an address of _any_ initialized kernel symbol from this
4346 * module, to allow debugging with gdb that doesn't support data
4347 * symbols from modules.*/
4348 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4350 rc = lu_kmem_init(osc_caches);
4352 lprocfs_osc_init_vars(&lvars);
4354 request_module("lquota");
4355 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4356 lquota_init(quota_interface);
4357 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4359 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4360 LUSTRE_OSC_NAME, &osc_device_type);
4362 if (quota_interface)
4363 PORTAL_SYMBOL_PUT(osc_quota_interface);
4364 lu_kmem_fini(osc_caches);
4368 spin_lock_init(&osc_ast_guard);
4369 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4375 static void /*__exit*/ osc_exit(void)
4377 lu_device_type_fini(&osc_device_type);
4379 lquota_exit(quota_interface);
4380 if (quota_interface)
4381 PORTAL_SYMBOL_PUT(osc_quota_interface);
4383 class_unregister_type(LUSTRE_OSC_NAME);
4384 lu_kmem_fini(osc_caches);
4387 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4388 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4389 MODULE_LICENSE("GPL");
4391 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);