1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 body->oa = *oinfo->oi_oa;
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214 lustre_swab_ost_body);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 *oinfo->oi_oa = body->oa;
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308 struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316 CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317 "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318 oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327 ptlrpc_request_free(req);
331 osc_pack_req_body(req, oinfo);
333 ptlrpc_request_set_replen(req);
335 rc = ptlrpc_queue_wait(req);
339 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EPROTO);
343 *oinfo->oi_oa = body->oa;
347 ptlrpc_req_finished(req);
351 static int osc_setattr_interpret(const struct lu_env *env,
352 struct ptlrpc_request *req,
353 struct osc_async_args *aa, int rc)
355 struct ost_body *body;
361 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363 GOTO(out, rc = -EPROTO);
365 *aa->aa_oi->oi_oa = body->oa;
367 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372 struct obd_trans_info *oti,
373 struct ptlrpc_request_set *rqset)
375 struct ptlrpc_request *req;
376 struct osc_async_args *aa;
380 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387 ptlrpc_request_free(req);
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
400 /* do mds to ost setattr asynchronously */
402 /* Do not wait for response. */
403 ptlrpcd_add_req(req, PSCOPE_OTHER);
405 req->rq_interpret_reply =
406 (ptlrpc_interpterer_t)osc_setattr_interpret;
408 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409 aa = ptlrpc_req_async_args(req);
412 ptlrpc_set_add_req(rqset, req);
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419 struct lov_stripe_md **ea, struct obd_trans_info *oti)
421 struct ptlrpc_request *req;
422 struct ost_body *body;
423 struct lov_stripe_md *lsm;
432 rc = obd_alloc_memmd(exp, &lsm);
437 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
439 GOTO(out, rc = -ENOMEM);
441 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
443 ptlrpc_request_free(req);
447 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
451 ptlrpc_request_set_replen(req);
453 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454 oa->o_flags == OBD_FL_DELORPHAN) {
456 "delorphan from OST integration");
457 /* Don't resend the delorphan req */
458 req->rq_no_resend = req->rq_no_delay = 1;
461 rc = ptlrpc_queue_wait(req);
465 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
467 GOTO(out_req, rc = -EPROTO);
471 /* This should really be sent by the OST */
472 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473 oa->o_valid |= OBD_MD_FLBLKSZ;
475 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476 * have valid lsm_oinfo data structs, so don't go touching that.
477 * This needs to be fixed in a big way.
479 lsm->lsm_object_id = oa->o_id;
480 lsm->lsm_object_gr = oa->o_gr;
484 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
486 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487 if (!oti->oti_logcookies)
488 oti_alloc_cookies(oti, 1);
489 *oti->oti_logcookies = oa->o_lcookie;
493 CDEBUG(D_HA, "transno: "LPD64"\n",
494 lustre_msg_get_transno(req->rq_repmsg));
496 ptlrpc_req_finished(req);
499 obd_free_memmd(exp, &lsm);
503 static int osc_punch_interpret(const struct lu_env *env,
504 struct ptlrpc_request *req,
505 struct osc_punch_args *aa, int rc)
507 struct ost_body *body;
513 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
515 GOTO(out, rc = -EPROTO);
517 *aa->pa_oa = body->oa;
519 rc = aa->pa_upcall(aa->pa_cookie, rc);
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524 struct obd_capa *capa,
525 obd_enqueue_update_f upcall, void *cookie,
526 struct ptlrpc_request_set *rqset)
528 struct ptlrpc_request *req;
529 struct osc_punch_args *aa;
530 struct ost_body *body;
534 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
538 osc_set_capa_size(req, &RMF_CAPA1, capa);
539 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
541 ptlrpc_request_free(req);
544 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545 ptlrpc_at_set_req_timeout(req);
547 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
550 osc_pack_capa(req, body, capa);
552 ptlrpc_request_set_replen(req);
555 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557 aa = ptlrpc_req_async_args(req);
559 aa->pa_upcall = upcall;
560 aa->pa_cookie = cookie;
561 if (rqset == PTLRPCD_SET)
562 ptlrpcd_add_req(req, PSCOPE_OTHER);
564 ptlrpc_set_add_req(rqset, req);
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570 struct obd_trans_info *oti,
571 struct ptlrpc_request_set *rqset)
573 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
574 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577 oinfo->oi_cb_up, oinfo, rqset);
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *md, obd_size start, obd_size end,
584 struct ptlrpc_request *req;
585 struct ost_body *body;
590 CDEBUG(D_INFO, "oa NULL\n");
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598 osc_set_capa_size(req, &RMF_CAPA1, capa);
599 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 ptlrpc_request_free(req);
605 /* overload the size and blocks fields in the oa with start/end */
606 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609 body->oa.o_size = start;
610 body->oa.o_blocks = end;
611 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612 osc_pack_capa(req, body, capa);
614 ptlrpc_request_set_replen(req);
616 rc = ptlrpc_queue_wait(req);
620 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
622 GOTO(out, rc = -EPROTO);
628 ptlrpc_req_finished(req);
632 /* Find and cancel locally locks matched by @mode in the resource found by
633 * @objid. Found locks are added into @cancel list. Returns the amount of
634 * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636 struct list_head *cancels, ldlm_mode_t mode,
639 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640 struct ldlm_res_id res_id;
641 struct ldlm_resource *res;
645 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
650 LDLM_RESOURCE_ADDREF(res);
651 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652 lock_flags, 0, NULL);
653 LDLM_RESOURCE_DELREF(res);
654 ldlm_resource_putref(res);
658 static int osc_destroy_interpret(const struct lu_env *env,
659 struct ptlrpc_request *req, void *data,
662 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
664 atomic_dec(&cli->cl_destroy_in_flight);
665 cfs_waitq_signal(&cli->cl_destroy_waitq);
669 static int osc_can_send_destroy(struct client_obd *cli)
671 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672 cli->cl_max_rpcs_in_flight) {
673 /* The destroy request can be sent */
676 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677 cli->cl_max_rpcs_in_flight) {
679 * The counter has been modified between the two atomic
682 cfs_waitq_signal(&cli->cl_destroy_waitq);
687 /* Destroy requests can be async always on the client, and we don't even really
688 * care about the return code since the client cannot do anything at all about
690 * When the MDS is unlinking a filename, it saves the file objects into a
691 * recovery llog, and these object records are cancelled when the OST reports
692 * they were destroyed and sync'd to disk (i.e. transaction committed).
693 * If the client dies, or the OST is down when the object should be destroyed,
694 * the records are not cancelled, and when the OST reconnects to the MDS next,
695 * it will retrieve the llog unlink logs and then sends the log cancellation
696 * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698 struct lov_stripe_md *ea, struct obd_trans_info *oti,
699 struct obd_export *md_export, void *capa)
701 struct client_obd *cli = &exp->exp_obd->u.cli;
702 struct ptlrpc_request *req;
703 struct ost_body *body;
704 CFS_LIST_HEAD(cancels);
709 CDEBUG(D_INFO, "oa NULL\n");
713 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714 LDLM_FL_DISCARD_DATA);
716 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
718 ldlm_lock_list_put(&cancels, l_bl_ast, count);
722 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
726 ptlrpc_request_free(req);
730 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731 ptlrpc_at_set_req_timeout(req);
733 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734 oa->o_lcookie = *oti->oti_logcookies;
735 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
739 osc_pack_capa(req, body, (struct obd_capa *)capa);
740 ptlrpc_request_set_replen(req);
742 /* don't throttle destroy RPCs for the MDT */
743 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744 req->rq_interpret_reply = osc_destroy_interpret;
745 if (!osc_can_send_destroy(cli)) {
746 struct l_wait_info lwi = { 0 };
749 * Wait until the number of on-going destroy RPCs drops
750 * under max_rpc_in_flight
752 l_wait_event_exclusive(cli->cl_destroy_waitq,
753 osc_can_send_destroy(cli), &lwi);
757 /* Do not wait for response */
758 ptlrpcd_add_req(req, PSCOPE_OTHER);
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
765 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767 LASSERT(!(oa->o_valid & bits));
770 client_obd_list_lock(&cli->cl_loi_list_lock);
771 oa->o_dirty = cli->cl_dirty;
772 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776 } else if (atomic_read(&obd_dirty_pages) -
777 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778 CERROR("dirty %d - %d > system dirty_max %d\n",
779 atomic_read(&obd_dirty_pages),
780 atomic_read(&obd_dirty_transit_pages),
781 obd_max_dirty_pages);
783 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784 CERROR("dirty %lu - dirty_max %lu too big???\n",
785 cli->cl_dirty, cli->cl_dirty_max);
788 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789 (cli->cl_max_rpcs_in_flight + 1);
790 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792 oa->o_grant = cli->cl_avail_grant;
793 oa->o_dropped = cli->cl_lost_grant;
794 cli->cl_lost_grant = 0;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
796 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
801 static void osc_update_next_shrink(struct client_obd *cli)
803 int time = GRANT_SHRINK_INTERVAL;
804 cli->cl_next_shrink_grant = cfs_time_shift(time);
805 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806 cli->cl_next_shrink_grant);
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811 struct brw_page *pga)
813 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814 atomic_inc(&obd_dirty_pages);
815 cli->cl_dirty += CFS_PAGE_SIZE;
816 cli->cl_avail_grant -= CFS_PAGE_SIZE;
817 pga->flag |= OBD_BRW_FROM_GRANT;
818 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819 CFS_PAGE_SIZE, pga, pga->pg);
820 LASSERT(cli->cl_avail_grant >= 0);
821 osc_update_next_shrink(cli);
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825 * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827 struct brw_page *pga, int sent)
829 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
837 pga->flag &= ~OBD_BRW_FROM_GRANT;
838 atomic_dec(&obd_dirty_pages);
839 cli->cl_dirty -= CFS_PAGE_SIZE;
840 if (pga->flag & OBD_BRW_NOCACHE) {
841 pga->flag &= ~OBD_BRW_NOCACHE;
842 atomic_dec(&obd_dirty_transit_pages);
843 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
846 cli->cl_lost_grant += CFS_PAGE_SIZE;
847 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850 /* For short writes we shouldn't count parts of pages that
851 * span a whole block on the OST side, or our accounting goes
852 * wrong. Should match the code in filter_grant_check. */
853 int offset = pga->off & ~CFS_PAGE_MASK;
854 int count = pga->count + (offset & (blocksize - 1));
855 int end = (offset + pga->count) & (blocksize - 1);
857 count += blocksize - end;
859 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862 cli->cl_avail_grant, cli->cl_dirty);
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
870 return cli->cl_r_in_flight + cli->cl_w_in_flight;
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
876 struct list_head *l, *tmp;
877 struct osc_cache_waiter *ocw;
880 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881 /* if we can't dirty more, we must wait until some is written */
882 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885 "osc max %ld, sys max %d\n", cli->cl_dirty,
886 cli->cl_dirty_max, obd_max_dirty_pages);
890 /* if still dirty cache but no grant wait for pending RPCs that
891 * may yet return us some grant before doing sync writes */
892 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894 cli->cl_w_in_flight);
898 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899 list_del_init(&ocw->ocw_entry);
900 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901 /* no more RPCs in flight to return grant, do sync IO */
902 ocw->ocw_rc = -EDQUOT;
903 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
905 osc_consume_write_grant(cli,
906 &ocw->ocw_oap->oap_brw_page);
909 cfs_waitq_signal(&ocw->ocw_waitq);
915 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
917 client_obd_list_lock(&cli->cl_loi_list_lock);
918 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
919 if (body->oa.o_valid & OBD_MD_FLGRANT)
920 cli->cl_avail_grant += body->oa.o_grant;
921 /* waiters are woken in brw_interpret */
922 client_obd_list_unlock(&cli->cl_loi_list_lock);
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926 void *key, obd_count vallen, void *val,
927 struct ptlrpc_request_set *set);
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930 struct ptlrpc_request *req,
933 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935 struct ost_body *body;
938 client_obd_list_lock(&cli->cl_loi_list_lock);
939 cli->cl_avail_grant += oa->o_grant;
940 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
946 osc_update_grant(cli, body);
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
954 client_obd_list_lock(&cli->cl_loi_list_lock);
955 oa->o_grant = cli->cl_avail_grant / 4;
956 cli->cl_avail_grant -= oa->o_grant;
957 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 oa->o_flags |= OBD_FL_SHRINK_GRANT;
959 osc_update_next_shrink(cli);
962 static int osc_shrink_grant(struct client_obd *cli)
965 struct ost_body *body;
972 osc_announce_cached(cli, &body->oa, 0);
973 osc_shrink_grant_local(cli, &body->oa);
974 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
975 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976 sizeof(*body), body, NULL);
978 client_obd_list_lock(&cli->cl_loi_list_lock);
979 cli->cl_avail_grant += body->oa.o_grant;
980 client_obd_list_unlock(&cli->cl_loi_list_lock);
987 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
988 static int osc_should_shrink_grant(struct client_obd *client)
990 cfs_time_t time = cfs_time_current();
991 cfs_time_t next_shrink = client->cl_next_shrink_grant;
992 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
997 osc_update_next_shrink(client);
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1004 struct client_obd *client;
1006 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007 if (osc_should_shrink_grant(client))
1008 osc_shrink_grant(client);
1013 static int osc_add_shrink_grant(struct client_obd *client)
1017 rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL,
1019 osc_grant_shrink_grant_cb, NULL,
1020 &client->cl_grant_shrink_list);
1022 CERROR("add grant client %s error %d\n",
1023 client->cl_import->imp_obd->obd_name, rc);
1026 CDEBUG(D_CACHE, "add grant client %s \n",
1027 client->cl_import->imp_obd->obd_name);
1028 osc_update_next_shrink(client);
1032 static int osc_del_shrink_grant(struct client_obd *client)
1034 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
1037 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1039 client_obd_list_lock(&cli->cl_loi_list_lock);
1040 cli->cl_avail_grant = ocd->ocd_grant;
1041 client_obd_list_unlock(&cli->cl_loi_list_lock);
1043 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1044 list_empty(&cli->cl_grant_shrink_list))
1045 osc_add_shrink_grant(cli);
1047 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1048 cli->cl_avail_grant, cli->cl_lost_grant);
1049 LASSERT(cli->cl_avail_grant >= 0);
1052 /* We assume that the reason this OSC got a short read is because it read
1053 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1054 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1055 * this stripe never got written at or beyond this stripe offset yet. */
1056 static void handle_short_read(int nob_read, obd_count page_count,
1057 struct brw_page **pga)
1062 /* skip bytes read OK */
1063 while (nob_read > 0) {
1064 LASSERT (page_count > 0);
1066 if (pga[i]->count > nob_read) {
1067 /* EOF inside this page */
1068 ptr = cfs_kmap(pga[i]->pg) +
1069 (pga[i]->off & ~CFS_PAGE_MASK);
1070 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1071 cfs_kunmap(pga[i]->pg);
1077 nob_read -= pga[i]->count;
1082 /* zero remaining pages */
1083 while (page_count-- > 0) {
1084 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1085 memset(ptr, 0, pga[i]->count);
1086 cfs_kunmap(pga[i]->pg);
1091 static int check_write_rcs(struct ptlrpc_request *req,
1092 int requested_nob, int niocount,
1093 obd_count page_count, struct brw_page **pga)
1097 /* return error if any niobuf was in error */
1098 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1099 sizeof(*remote_rcs) * niocount, NULL);
1100 if (remote_rcs == NULL) {
1101 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1104 if (lustre_msg_swabbed(req->rq_repmsg))
1105 for (i = 0; i < niocount; i++)
1106 __swab32s(&remote_rcs[i]);
1108 for (i = 0; i < niocount; i++) {
1109 if (remote_rcs[i] < 0)
1110 return(remote_rcs[i]);
1112 if (remote_rcs[i] != 0) {
1113 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1114 i, remote_rcs[i], req);
1119 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1120 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1121 req->rq_bulk->bd_nob_transferred, requested_nob);
1128 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1130 if (p1->flag != p2->flag) {
1131 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1132 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1134 /* warn if we try to combine flags that we don't know to be
1135 * safe to combine */
1136 if ((p1->flag & mask) != (p2->flag & mask))
1137 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1138 "same brw?\n", p1->flag, p2->flag);
1142 return (p1->off + p1->count == p2->off);
1145 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1146 struct brw_page **pga, int opc,
1147 cksum_type_t cksum_type)
1152 LASSERT (pg_count > 0);
1153 cksum = init_checksum(cksum_type);
1154 while (nob > 0 && pg_count > 0) {
1155 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1156 int off = pga[i]->off & ~CFS_PAGE_MASK;
1157 int count = pga[i]->count > nob ? nob : pga[i]->count;
1159 /* corrupt the data before we compute the checksum, to
1160 * simulate an OST->client data error */
1161 if (i == 0 && opc == OST_READ &&
1162 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1163 memcpy(ptr + off, "bad1", min(4, nob));
1164 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1165 cfs_kunmap(pga[i]->pg);
1166 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1169 nob -= pga[i]->count;
1173 /* For sending we only compute the wrong checksum instead
1174 * of corrupting the data so it is still correct on a redo */
1175 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1181 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1182 struct lov_stripe_md *lsm, obd_count page_count,
1183 struct brw_page **pga,
1184 struct ptlrpc_request **reqp,
1185 struct obd_capa *ocapa, int reserve)
1187 struct ptlrpc_request *req;
1188 struct ptlrpc_bulk_desc *desc;
1189 struct ost_body *body;
1190 struct obd_ioobj *ioobj;
1191 struct niobuf_remote *niobuf;
1192 int niocount, i, requested_nob, opc, rc;
1193 struct osc_brw_async_args *aa;
1194 struct req_capsule *pill;
1195 struct brw_page *pg_prev;
1198 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1199 RETURN(-ENOMEM); /* Recoverable */
1200 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1201 RETURN(-EINVAL); /* Fatal */
1203 if ((cmd & OBD_BRW_WRITE) != 0) {
1205 req = ptlrpc_request_alloc_pool(cli->cl_import,
1206 cli->cl_import->imp_rq_pool,
1210 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1215 for (niocount = i = 1; i < page_count; i++) {
1216 if (!can_merge_pages(pga[i - 1], pga[i]))
1220 pill = &req->rq_pill;
1221 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1222 niocount * sizeof(*niobuf));
1223 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1225 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1227 ptlrpc_request_free(req);
1230 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1231 ptlrpc_at_set_req_timeout(req);
1233 if (opc == OST_WRITE)
1234 desc = ptlrpc_prep_bulk_imp(req, page_count,
1235 BULK_GET_SOURCE, OST_BULK_PORTAL);
1237 desc = ptlrpc_prep_bulk_imp(req, page_count,
1238 BULK_PUT_SINK, OST_BULK_PORTAL);
1241 GOTO(out, rc = -ENOMEM);
1242 /* NB request now owns desc and will free it when it gets freed */
1244 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1245 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1246 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1247 LASSERT(body && ioobj && niobuf);
1251 obdo_to_ioobj(oa, ioobj);
1252 ioobj->ioo_bufcnt = niocount;
1253 osc_pack_capa(req, body, ocapa);
1254 LASSERT (page_count > 0);
1256 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1257 struct brw_page *pg = pga[i];
1259 LASSERT(pg->count > 0);
1260 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1261 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1262 pg->off, pg->count);
1264 LASSERTF(i == 0 || pg->off > pg_prev->off,
1265 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1266 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1268 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1269 pg_prev->pg, page_private(pg_prev->pg),
1270 pg_prev->pg->index, pg_prev->off);
1272 LASSERTF(i == 0 || pg->off > pg_prev->off,
1273 "i %d p_c %u\n", i, page_count);
1275 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1276 (pg->flag & OBD_BRW_SRVLOCK));
1278 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1280 requested_nob += pg->count;
1282 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1284 niobuf->len += pg->count;
1286 niobuf->offset = pg->off;
1287 niobuf->len = pg->count;
1288 niobuf->flags = pg->flag;
1293 LASSERTF((void *)(niobuf - niocount) ==
1294 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1295 niocount * sizeof(*niobuf)),
1296 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1297 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1298 (void *)(niobuf - niocount));
1300 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1301 if (osc_should_shrink_grant(cli))
1302 osc_shrink_grant_local(cli, &body->oa);
1304 /* size[REQ_REC_OFF] still sizeof (*body) */
1305 if (opc == OST_WRITE) {
1306 if (unlikely(cli->cl_checksum) &&
1307 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1308 /* store cl_cksum_type in a local variable since
1309 * it can be changed via lprocfs */
1310 cksum_type_t cksum_type = cli->cl_cksum_type;
1312 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1313 oa->o_flags = body->oa.o_flags = 0;
1314 body->oa.o_flags |= cksum_type_pack(cksum_type);
1315 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1316 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1320 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1322 /* save this in 'oa', too, for later checking */
1323 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1324 oa->o_flags |= cksum_type_pack(cksum_type);
1326 /* clear out the checksum flag, in case this is a
1327 * resend but cl_checksum is no longer set. b=11238 */
1328 oa->o_valid &= ~OBD_MD_FLCKSUM;
1330 oa->o_cksum = body->oa.o_cksum;
1331 /* 1 RC per niobuf */
1332 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1333 sizeof(__u32) * niocount);
1335 if (unlikely(cli->cl_checksum) &&
1336 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1337 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1338 body->oa.o_flags = 0;
1339 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1340 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1342 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1343 /* 1 RC for the whole I/O */
1345 ptlrpc_request_set_replen(req);
1347 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1348 aa = ptlrpc_req_async_args(req);
1350 aa->aa_requested_nob = requested_nob;
1351 aa->aa_nio_count = niocount;
1352 aa->aa_page_count = page_count;
1356 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1357 if (ocapa && reserve)
1358 aa->aa_ocapa = capa_get(ocapa);
1364 ptlrpc_req_finished(req);
1368 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1369 __u32 client_cksum, __u32 server_cksum, int nob,
1370 obd_count page_count, struct brw_page **pga,
1371 cksum_type_t client_cksum_type)
1375 cksum_type_t cksum_type;
1377 if (server_cksum == client_cksum) {
1378 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1382 if (oa->o_valid & OBD_MD_FLFLAGS)
1383 cksum_type = cksum_type_unpack(oa->o_flags);
1385 cksum_type = OBD_CKSUM_CRC32;
1387 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1390 if (cksum_type != client_cksum_type)
1391 msg = "the server did not use the checksum type specified in "
1392 "the original request - likely a protocol problem";
1393 else if (new_cksum == server_cksum)
1394 msg = "changed on the client after we checksummed it - "
1395 "likely false positive due to mmap IO (bug 11742)";
1396 else if (new_cksum == client_cksum)
1397 msg = "changed in transit before arrival at OST";
1399 msg = "changed in transit AND doesn't match the original - "
1400 "likely false positive due to mmap IO (bug 11742)";
1402 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1403 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1404 "["LPU64"-"LPU64"]\n",
1405 msg, libcfs_nid2str(peer->nid),
1406 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1407 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1410 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1412 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1413 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1414 "client csum now %x\n", client_cksum, client_cksum_type,
1415 server_cksum, cksum_type, new_cksum);
1419 /* Note rc enters this function as number of bytes transferred */
1420 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1422 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1423 const lnet_process_id_t *peer =
1424 &req->rq_import->imp_connection->c_peer;
1425 struct client_obd *cli = aa->aa_cli;
1426 struct ost_body *body;
1427 __u32 client_cksum = 0;
1430 if (rc < 0 && rc != -EDQUOT)
1433 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1434 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1435 lustre_swab_ost_body);
1437 CDEBUG(D_INFO, "Can't unpack body\n");
1441 /* set/clear over quota flag for a uid/gid */
1442 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1443 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1444 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1445 body->oa.o_gid, body->oa.o_valid,
1451 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1452 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1454 osc_update_grant(cli, body);
1456 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1458 CERROR("Unexpected +ve rc %d\n", rc);
1461 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1463 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1466 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1467 check_write_checksum(&body->oa, peer, client_cksum,
1468 body->oa.o_cksum, aa->aa_requested_nob,
1469 aa->aa_page_count, aa->aa_ppga,
1470 cksum_type_unpack(aa->aa_oa->o_flags)))
1473 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1474 aa->aa_page_count, aa->aa_ppga);
1478 /* The rest of this function executes only for OST_READs */
1480 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1484 if (rc > aa->aa_requested_nob) {
1485 CERROR("Unexpected rc %d (%d requested)\n", rc,
1486 aa->aa_requested_nob);
1490 if (rc != req->rq_bulk->bd_nob_transferred) {
1491 CERROR ("Unexpected rc %d (%d transferred)\n",
1492 rc, req->rq_bulk->bd_nob_transferred);
1496 if (rc < aa->aa_requested_nob)
1497 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1499 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1500 static int cksum_counter;
1501 __u32 server_cksum = body->oa.o_cksum;
1504 cksum_type_t cksum_type;
1506 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1507 cksum_type = cksum_type_unpack(body->oa.o_flags);
1509 cksum_type = OBD_CKSUM_CRC32;
1510 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1511 aa->aa_ppga, OST_READ,
1514 if (peer->nid == req->rq_bulk->bd_sender) {
1518 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1521 if (server_cksum == ~0 && rc > 0) {
1522 CERROR("Protocol error: server %s set the 'checksum' "
1523 "bit, but didn't send a checksum. Not fatal, "
1524 "but please notify on http://bugzilla.lustre.org/\n",
1525 libcfs_nid2str(peer->nid));
1526 } else if (server_cksum != client_cksum) {
1527 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1528 "%s%s%s inum "LPU64"/"LPU64" object "
1529 LPU64"/"LPU64" extent "
1530 "["LPU64"-"LPU64"]\n",
1531 req->rq_import->imp_obd->obd_name,
1532 libcfs_nid2str(peer->nid),
1534 body->oa.o_valid & OBD_MD_FLFID ?
1535 body->oa.o_fid : (__u64)0,
1536 body->oa.o_valid & OBD_MD_FLFID ?
1537 body->oa.o_generation :(__u64)0,
1539 body->oa.o_valid & OBD_MD_FLGROUP ?
1540 body->oa.o_gr : (__u64)0,
1541 aa->aa_ppga[0]->off,
1542 aa->aa_ppga[aa->aa_page_count-1]->off +
1543 aa->aa_ppga[aa->aa_page_count-1]->count -
1545 CERROR("client %x, server %x, cksum_type %x\n",
1546 client_cksum, server_cksum, cksum_type);
1548 aa->aa_oa->o_cksum = client_cksum;
1552 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1555 } else if (unlikely(client_cksum)) {
1556 static int cksum_missed;
1559 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1560 CERROR("Checksum %u requested from %s but not sent\n",
1561 cksum_missed, libcfs_nid2str(peer->nid));
1567 *aa->aa_oa = body->oa;
1572 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1573 struct lov_stripe_md *lsm,
1574 obd_count page_count, struct brw_page **pga,
1575 struct obd_capa *ocapa)
1577 struct ptlrpc_request *req;
1581 struct l_wait_info lwi;
1585 cfs_waitq_init(&waitq);
1588 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1589 page_count, pga, &req, ocapa, 0);
1593 rc = ptlrpc_queue_wait(req);
1595 if (rc == -ETIMEDOUT && req->rq_resend) {
1596 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1597 ptlrpc_req_finished(req);
1601 rc = osc_brw_fini_request(req, rc);
1603 ptlrpc_req_finished(req);
1604 if (osc_recoverable_error(rc)) {
1606 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1607 CERROR("too many resend retries, returning error\n");
1611 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1612 l_wait_event(waitq, 0, &lwi);
1620 int osc_brw_redo_request(struct ptlrpc_request *request,
1621 struct osc_brw_async_args *aa)
1623 struct ptlrpc_request *new_req;
1624 struct ptlrpc_request_set *set = request->rq_set;
1625 struct osc_brw_async_args *new_aa;
1626 struct osc_async_page *oap;
1630 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1631 CERROR("too many resend retries, returning error\n");
1635 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1637 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1638 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1639 aa->aa_cli, aa->aa_oa,
1640 NULL /* lsm unused by osc currently */,
1641 aa->aa_page_count, aa->aa_ppga,
1642 &new_req, aa->aa_ocapa, 0);
1646 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1648 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1649 if (oap->oap_request != NULL) {
1650 LASSERTF(request == oap->oap_request,
1651 "request %p != oap_request %p\n",
1652 request, oap->oap_request);
1653 if (oap->oap_interrupted) {
1654 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1655 ptlrpc_req_finished(new_req);
1660 /* New request takes over pga and oaps from old request.
1661 * Note that copying a list_head doesn't work, need to move it... */
1663 new_req->rq_interpret_reply = request->rq_interpret_reply;
1664 new_req->rq_async_args = request->rq_async_args;
1665 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1667 new_aa = ptlrpc_req_async_args(new_req);
1669 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1670 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1671 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1673 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1674 if (oap->oap_request) {
1675 ptlrpc_req_finished(oap->oap_request);
1676 oap->oap_request = ptlrpc_request_addref(new_req);
1680 new_aa->aa_ocapa = aa->aa_ocapa;
1681 aa->aa_ocapa = NULL;
1683 /* use ptlrpc_set_add_req is safe because interpret functions work
1684 * in check_set context. only one way exist with access to request
1685 * from different thread got -EINTR - this way protected with
1686 * cl_loi_list_lock */
1687 ptlrpc_set_add_req(set, new_req);
1689 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1691 DEBUG_REQ(D_INFO, new_req, "new request");
1696 * ugh, we want disk allocation on the target to happen in offset order. we'll
1697 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1698 * fine for our small page arrays and doesn't require allocation. its an
1699 * insertion sort that swaps elements that are strides apart, shrinking the
1700 * stride down until its '1' and the array is sorted.
1702 static void sort_brw_pages(struct brw_page **array, int num)
1705 struct brw_page *tmp;
1709 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1714 for (i = stride ; i < num ; i++) {
1717 while (j >= stride && array[j - stride]->off > tmp->off) {
1718 array[j] = array[j - stride];
1723 } while (stride > 1);
1726 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1732 LASSERT (pages > 0);
1733 offset = pg[i]->off & ~CFS_PAGE_MASK;
1737 if (pages == 0) /* that's all */
1740 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1741 return count; /* doesn't end on page boundary */
1744 offset = pg[i]->off & ~CFS_PAGE_MASK;
1745 if (offset != 0) /* doesn't start on page boundary */
1752 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1754 struct brw_page **ppga;
1757 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1761 for (i = 0; i < count; i++)
1766 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1768 LASSERT(ppga != NULL);
1769 OBD_FREE(ppga, sizeof(*ppga) * count);
1772 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1773 obd_count page_count, struct brw_page *pga,
1774 struct obd_trans_info *oti)
1776 struct obdo *saved_oa = NULL;
1777 struct brw_page **ppga, **orig;
1778 struct obd_import *imp = class_exp2cliimp(exp);
1779 struct client_obd *cli;
1780 int rc, page_count_orig;
1783 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1784 cli = &imp->imp_obd->u.cli;
1786 if (cmd & OBD_BRW_CHECK) {
1787 /* The caller just wants to know if there's a chance that this
1788 * I/O can succeed */
1790 if (imp->imp_invalid)
1795 /* test_brw with a failed create can trip this, maybe others. */
1796 LASSERT(cli->cl_max_pages_per_rpc);
1800 orig = ppga = osc_build_ppga(pga, page_count);
1803 page_count_orig = page_count;
1805 sort_brw_pages(ppga, page_count);
1806 while (page_count) {
1807 obd_count pages_per_brw;
1809 if (page_count > cli->cl_max_pages_per_rpc)
1810 pages_per_brw = cli->cl_max_pages_per_rpc;
1812 pages_per_brw = page_count;
1814 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1816 if (saved_oa != NULL) {
1817 /* restore previously saved oa */
1818 *oinfo->oi_oa = *saved_oa;
1819 } else if (page_count > pages_per_brw) {
1820 /* save a copy of oa (brw will clobber it) */
1821 OBDO_ALLOC(saved_oa);
1822 if (saved_oa == NULL)
1823 GOTO(out, rc = -ENOMEM);
1824 *saved_oa = *oinfo->oi_oa;
1827 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1828 pages_per_brw, ppga, oinfo->oi_capa);
1833 page_count -= pages_per_brw;
1834 ppga += pages_per_brw;
1838 osc_release_ppga(orig, page_count_orig);
1840 if (saved_oa != NULL)
1841 OBDO_FREE(saved_oa);
1846 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1847 * the dirty accounting. Writeback completes or truncate happens before
1848 * writing starts. Must be called with the loi lock held. */
1849 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1852 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1856 /* This maintains the lists of pending pages to read/write for a given object
1857 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1858 * to quickly find objects that are ready to send an RPC. */
1859 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1865 if (lop->lop_num_pending == 0)
1868 /* if we have an invalid import we want to drain the queued pages
1869 * by forcing them through rpcs that immediately fail and complete
1870 * the pages. recovery relies on this to empty the queued pages
1871 * before canceling the locks and evicting down the llite pages */
1872 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1875 /* stream rpcs in queue order as long as as there is an urgent page
1876 * queued. this is our cheap solution for good batching in the case
1877 * where writepage marks some random page in the middle of the file
1878 * as urgent because of, say, memory pressure */
1879 if (!list_empty(&lop->lop_urgent)) {
1880 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1883 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1884 optimal = cli->cl_max_pages_per_rpc;
1885 if (cmd & OBD_BRW_WRITE) {
1886 /* trigger a write rpc stream as long as there are dirtiers
1887 * waiting for space. as they're waiting, they're not going to
1888 * create more pages to coallesce with what's waiting.. */
1889 if (!list_empty(&cli->cl_cache_waiters)) {
1890 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1893 /* +16 to avoid triggering rpcs that would want to include pages
1894 * that are being queued but which can't be made ready until
1895 * the queuer finishes with the page. this is a wart for
1896 * llite::commit_write() */
1899 if (lop->lop_num_pending >= optimal)
1905 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1907 struct osc_async_page *oap;
1910 if (list_empty(&lop->lop_urgent))
1913 oap = list_entry(lop->lop_urgent.next,
1914 struct osc_async_page, oap_urgent_item);
1916 if (oap->oap_async_flags & ASYNC_HP) {
1917 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1924 static void on_list(struct list_head *item, struct list_head *list,
1927 if (list_empty(item) && should_be_on)
1928 list_add_tail(item, list);
1929 else if (!list_empty(item) && !should_be_on)
1930 list_del_init(item);
1933 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1934 * can find pages to build into rpcs quickly */
1935 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1937 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1938 lop_makes_hprpc(&loi->loi_read_lop)) {
1940 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1941 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1943 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1944 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1945 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1946 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1949 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1950 loi->loi_write_lop.lop_num_pending);
1952 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1953 loi->loi_read_lop.lop_num_pending);
1956 static void lop_update_pending(struct client_obd *cli,
1957 struct loi_oap_pages *lop, int cmd, int delta)
1959 lop->lop_num_pending += delta;
1960 if (cmd & OBD_BRW_WRITE)
1961 cli->cl_pending_w_pages += delta;
1963 cli->cl_pending_r_pages += delta;
1967 * this is called when a sync waiter receives an interruption. Its job is to
1968 * get the caller woken as soon as possible. If its page hasn't been put in an
1969 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1970 * desiring interruption which will forcefully complete the rpc once the rpc
1973 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1975 struct loi_oap_pages *lop;
1976 struct lov_oinfo *loi;
1980 LASSERT(!oap->oap_interrupted);
1981 oap->oap_interrupted = 1;
1983 /* ok, it's been put in an rpc. only one oap gets a request reference */
1984 if (oap->oap_request != NULL) {
1985 ptlrpc_mark_interrupted(oap->oap_request);
1986 ptlrpcd_wake(oap->oap_request);
1987 ptlrpc_req_finished(oap->oap_request);
1988 oap->oap_request = NULL;
1992 * page completion may be called only if ->cpo_prep() method was
1993 * executed by osc_io_submit(), that also adds page the to pending list
1995 if (!list_empty(&oap->oap_pending_item)) {
1996 list_del_init(&oap->oap_pending_item);
1997 list_del_init(&oap->oap_urgent_item);
2000 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2001 &loi->loi_write_lop : &loi->loi_read_lop;
2002 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2003 loi_list_maint(oap->oap_cli, oap->oap_loi);
2004 rc = oap->oap_caller_ops->ap_completion(env,
2005 oap->oap_caller_data,
2006 oap->oap_cmd, NULL, -EINTR);
2012 /* this is trying to propogate async writeback errors back up to the
2013 * application. As an async write fails we record the error code for later if
2014 * the app does an fsync. As long as errors persist we force future rpcs to be
2015 * sync so that the app can get a sync error and break the cycle of queueing
2016 * pages for which writeback will fail. */
2017 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2024 ar->ar_force_sync = 1;
2025 ar->ar_min_xid = ptlrpc_sample_next_xid();
2030 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2031 ar->ar_force_sync = 0;
2034 void osc_oap_to_pending(struct osc_async_page *oap)
2036 struct loi_oap_pages *lop;
2038 if (oap->oap_cmd & OBD_BRW_WRITE)
2039 lop = &oap->oap_loi->loi_write_lop;
2041 lop = &oap->oap_loi->loi_read_lop;
2043 if (oap->oap_async_flags & ASYNC_HP)
2044 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2045 else if (oap->oap_async_flags & ASYNC_URGENT)
2046 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2047 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2048 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2051 /* this must be called holding the loi list lock to give coverage to exit_cache,
2052 * async_flag maintenance, and oap_request */
2053 static void osc_ap_completion(const struct lu_env *env,
2054 struct client_obd *cli, struct obdo *oa,
2055 struct osc_async_page *oap, int sent, int rc)
2060 if (oap->oap_request != NULL) {
2061 xid = ptlrpc_req_xid(oap->oap_request);
2062 ptlrpc_req_finished(oap->oap_request);
2063 oap->oap_request = NULL;
2066 oap->oap_async_flags = 0;
2067 oap->oap_interrupted = 0;
2069 if (oap->oap_cmd & OBD_BRW_WRITE) {
2070 osc_process_ar(&cli->cl_ar, xid, rc);
2071 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2074 if (rc == 0 && oa != NULL) {
2075 if (oa->o_valid & OBD_MD_FLBLOCKS)
2076 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2077 if (oa->o_valid & OBD_MD_FLMTIME)
2078 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2079 if (oa->o_valid & OBD_MD_FLATIME)
2080 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2081 if (oa->o_valid & OBD_MD_FLCTIME)
2082 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2085 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2086 oap->oap_cmd, oa, rc);
2088 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2089 * I/O on the page could start, but OSC calls it under lock
2090 * and thus we can add oap back to pending safely */
2092 /* upper layer wants to leave the page on pending queue */
2093 osc_oap_to_pending(oap);
2095 osc_exit_cache(cli, oap, sent);
2099 static int brw_interpret(const struct lu_env *env,
2100 struct ptlrpc_request *req, void *data, int rc)
2102 struct osc_brw_async_args *aa = data;
2103 struct client_obd *cli;
2107 rc = osc_brw_fini_request(req, rc);
2108 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2109 if (osc_recoverable_error(rc)) {
2110 rc = osc_brw_redo_request(req, aa);
2116 capa_put(aa->aa_ocapa);
2117 aa->aa_ocapa = NULL;
2122 client_obd_list_lock(&cli->cl_loi_list_lock);
2124 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2125 * is called so we know whether to go to sync BRWs or wait for more
2126 * RPCs to complete */
2127 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2128 cli->cl_w_in_flight--;
2130 cli->cl_r_in_flight--;
2132 async = list_empty(&aa->aa_oaps);
2133 if (!async) { /* from osc_send_oap_rpc() */
2134 struct osc_async_page *oap, *tmp;
2135 /* the caller may re-use the oap after the completion call so
2136 * we need to clean it up a little */
2137 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2138 list_del_init(&oap->oap_rpc_item);
2139 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2141 OBDO_FREE(aa->aa_oa);
2142 } else { /* from async_internal() */
2144 for (i = 0; i < aa->aa_page_count; i++)
2145 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2147 osc_wake_cache_waiters(cli);
2148 osc_check_rpcs(env, cli);
2149 client_obd_list_unlock(&cli->cl_loi_list_lock);
2151 cl_req_completion(env, aa->aa_clerq, rc);
2152 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2156 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2157 struct client_obd *cli,
2158 struct list_head *rpc_list,
2159 int page_count, int cmd)
2161 struct ptlrpc_request *req;
2162 struct brw_page **pga = NULL;
2163 struct osc_brw_async_args *aa;
2164 struct obdo *oa = NULL;
2165 const struct obd_async_page_ops *ops = NULL;
2166 void *caller_data = NULL;
2167 struct osc_async_page *oap;
2168 struct osc_async_page *tmp;
2169 struct ost_body *body;
2170 struct cl_req *clerq = NULL;
2171 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2172 struct ldlm_lock *lock = NULL;
2173 struct cl_req_attr crattr;
2177 LASSERT(!list_empty(rpc_list));
2179 memset(&crattr, 0, sizeof crattr);
2180 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2182 GOTO(out, req = ERR_PTR(-ENOMEM));
2186 GOTO(out, req = ERR_PTR(-ENOMEM));
2189 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2190 struct cl_page *page = osc_oap2cl_page(oap);
2192 ops = oap->oap_caller_ops;
2193 caller_data = oap->oap_caller_data;
2195 clerq = cl_req_alloc(env, page, crt,
2196 1 /* only 1-object rpcs for
2199 GOTO(out, req = (void *)clerq);
2200 lock = oap->oap_ldlm_lock;
2202 pga[i] = &oap->oap_brw_page;
2203 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2204 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2205 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2207 cl_req_page_add(env, clerq, page);
2210 /* always get the data for the obdo for the rpc */
2211 LASSERT(ops != NULL);
2213 crattr.cra_capa = NULL;
2214 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2216 oa->o_handle = lock->l_remote_handle;
2217 oa->o_valid |= OBD_MD_FLHANDLE;
2220 rc = cl_req_prep(env, clerq);
2222 CERROR("cl_req_prep failed: %d\n", rc);
2223 GOTO(out, req = ERR_PTR(rc));
2226 sort_brw_pages(pga, page_count);
2227 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2228 pga, &req, crattr.cra_capa, 1);
2230 CERROR("prep_req failed: %d\n", rc);
2231 GOTO(out, req = ERR_PTR(rc));
2234 /* Need to update the timestamps after the request is built in case
2235 * we race with setattr (locally or in queue at OST). If OST gets
2236 * later setattr before earlier BRW (as determined by the request xid),
2237 * the OST will not use BRW timestamps. Sadly, there is no obvious
2238 * way to do this in a single call. bug 10150 */
2239 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2240 cl_req_attr_set(env, clerq, &crattr,
2241 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2243 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2244 aa = ptlrpc_req_async_args(req);
2245 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2246 list_splice(rpc_list, &aa->aa_oaps);
2247 CFS_INIT_LIST_HEAD(rpc_list);
2248 aa->aa_clerq = clerq;
2250 capa_put(crattr.cra_capa);
2255 OBD_FREE(pga, sizeof(*pga) * page_count);
2256 /* this should happen rarely and is pretty bad, it makes the
2257 * pending list not follow the dirty order */
2258 client_obd_list_lock(&cli->cl_loi_list_lock);
2259 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2260 list_del_init(&oap->oap_rpc_item);
2262 /* queued sync pages can be torn down while the pages
2263 * were between the pending list and the rpc */
2264 if (oap->oap_interrupted) {
2265 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2266 osc_ap_completion(env, cli, NULL, oap, 0,
2270 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2272 if (clerq && !IS_ERR(clerq))
2273 cl_req_completion(env, clerq, PTR_ERR(req));
2279 * prepare pages for ASYNC io and put pages in send queue.
2283 * \param cmd - OBD_BRW_* macroses
2284 * \param lop - pending pages
2286 * \return zero if pages successfully add to send queue.
2287 * \return not zere if error occurring.
2290 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2291 struct lov_oinfo *loi,
2292 int cmd, struct loi_oap_pages *lop)
2294 struct ptlrpc_request *req;
2295 obd_count page_count = 0;
2296 struct osc_async_page *oap = NULL, *tmp;
2297 struct osc_brw_async_args *aa;
2298 const struct obd_async_page_ops *ops;
2299 CFS_LIST_HEAD(rpc_list);
2300 unsigned int ending_offset;
2301 unsigned starting_offset = 0;
2303 struct cl_object *clob = NULL;
2306 /* If there are HP OAPs we need to handle at least 1 of them,
2307 * move it the beginning of the pending list for that. */
2308 if (!list_empty(&lop->lop_urgent)) {
2309 oap = list_entry(lop->lop_urgent.next,
2310 struct osc_async_page, oap_urgent_item);
2311 if (oap->oap_async_flags & ASYNC_HP)
2312 list_move(&oap->oap_pending_item, &lop->lop_pending);
2315 /* first we find the pages we're allowed to work with */
2316 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2318 ops = oap->oap_caller_ops;
2320 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2321 "magic 0x%x\n", oap, oap->oap_magic);
2324 /* pin object in memory, so that completion call-backs
2325 * can be safely called under client_obd_list lock. */
2326 clob = osc_oap2cl_page(oap)->cp_obj;
2327 cl_object_get(clob);
2330 if (page_count != 0 &&
2331 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2332 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2333 " oap %p, page %p, srvlock %u\n",
2334 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2337 /* in llite being 'ready' equates to the page being locked
2338 * until completion unlocks it. commit_write submits a page
2339 * as not ready because its unlock will happen unconditionally
2340 * as the call returns. if we race with commit_write giving
2341 * us that page we dont' want to create a hole in the page
2342 * stream, so we stop and leave the rpc to be fired by
2343 * another dirtier or kupdated interval (the not ready page
2344 * will still be on the dirty list). we could call in
2345 * at the end of ll_file_write to process the queue again. */
2346 if (!(oap->oap_async_flags & ASYNC_READY)) {
2347 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2350 CDEBUG(D_INODE, "oap %p page %p returned %d "
2351 "instead of ready\n", oap,
2355 /* llite is telling us that the page is still
2356 * in commit_write and that we should try
2357 * and put it in an rpc again later. we
2358 * break out of the loop so we don't create
2359 * a hole in the sequence of pages in the rpc
2364 /* the io isn't needed.. tell the checks
2365 * below to complete the rpc with EINTR */
2366 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2367 oap->oap_count = -EINTR;
2370 oap->oap_async_flags |= ASYNC_READY;
2373 LASSERTF(0, "oap %p page %p returned %d "
2374 "from make_ready\n", oap,
2382 * Page submitted for IO has to be locked. Either by
2383 * ->ap_make_ready() or by higher layers.
2385 #if defined(__KERNEL__) && defined(__linux__)
2387 struct cl_page *page;
2389 page = osc_oap2cl_page(oap);
2391 if (page->cp_type == CPT_CACHEABLE &&
2392 !(PageLocked(oap->oap_page) &&
2393 (CheckWriteback(oap->oap_page, cmd)))) {
2394 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2396 (long)oap->oap_page->flags,
2397 oap->oap_async_flags);
2402 /* If there is a gap at the start of this page, it can't merge
2403 * with any previous page, so we'll hand the network a
2404 * "fragmented" page array that it can't transfer in 1 RDMA */
2405 if (page_count != 0 && oap->oap_page_off != 0)
2408 /* take the page out of our book-keeping */
2409 list_del_init(&oap->oap_pending_item);
2410 lop_update_pending(cli, lop, cmd, -1);
2411 list_del_init(&oap->oap_urgent_item);
2413 if (page_count == 0)
2414 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2415 (PTLRPC_MAX_BRW_SIZE - 1);
2417 /* ask the caller for the size of the io as the rpc leaves. */
2418 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2420 ops->ap_refresh_count(env, oap->oap_caller_data,
2422 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2424 if (oap->oap_count <= 0) {
2425 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2427 osc_ap_completion(env, cli, NULL,
2428 oap, 0, oap->oap_count);
2432 /* now put the page back in our accounting */
2433 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2434 if (page_count == 0)
2435 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2436 if (++page_count >= cli->cl_max_pages_per_rpc)
2439 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2440 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2441 * have the same alignment as the initial writes that allocated
2442 * extents on the server. */
2443 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2444 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2445 if (ending_offset == 0)
2448 /* If there is a gap at the end of this page, it can't merge
2449 * with any subsequent pages, so we'll hand the network a
2450 * "fragmented" page array that it can't transfer in 1 RDMA */
2451 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2455 osc_wake_cache_waiters(cli);
2457 loi_list_maint(cli, loi);
2459 client_obd_list_unlock(&cli->cl_loi_list_lock);
2462 cl_object_put(env, clob);
2464 if (page_count == 0) {
2465 client_obd_list_lock(&cli->cl_loi_list_lock);
2469 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2471 LASSERT(list_empty(&rpc_list));
2472 loi_list_maint(cli, loi);
2473 RETURN(PTR_ERR(req));
2476 aa = ptlrpc_req_async_args(req);
2478 if (cmd == OBD_BRW_READ) {
2479 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2480 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2481 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2482 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2484 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2485 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2486 cli->cl_w_in_flight);
2487 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2488 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2490 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2492 client_obd_list_lock(&cli->cl_loi_list_lock);
2494 if (cmd == OBD_BRW_READ)
2495 cli->cl_r_in_flight++;
2497 cli->cl_w_in_flight++;
2499 /* queued sync pages can be torn down while the pages
2500 * were between the pending list and the rpc */
2502 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2503 /* only one oap gets a request reference */
2506 if (oap->oap_interrupted && !req->rq_intr) {
2507 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2509 ptlrpc_mark_interrupted(req);
2513 tmp->oap_request = ptlrpc_request_addref(req);
2515 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2516 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2518 req->rq_interpret_reply = brw_interpret;
2519 ptlrpcd_add_req(req, PSCOPE_BRW);
2523 #define LOI_DEBUG(LOI, STR, args...) \
2524 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2525 !list_empty(&(LOI)->loi_ready_item) || \
2526 !list_empty(&(LOI)->loi_hp_ready_item), \
2527 (LOI)->loi_write_lop.lop_num_pending, \
2528 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2529 (LOI)->loi_read_lop.lop_num_pending, \
2530 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2533 /* This is called by osc_check_rpcs() to find which objects have pages that
2534 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2535 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2539 /* First return objects that have blocked locks so that they
2540 * will be flushed quickly and other clients can get the lock,
2541 * then objects which have pages ready to be stuffed into RPCs */
2542 if (!list_empty(&cli->cl_loi_hp_ready_list))
2543 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2544 struct lov_oinfo, loi_hp_ready_item));
2545 if (!list_empty(&cli->cl_loi_ready_list))
2546 RETURN(list_entry(cli->cl_loi_ready_list.next,
2547 struct lov_oinfo, loi_ready_item));
2549 /* then if we have cache waiters, return all objects with queued
2550 * writes. This is especially important when many small files
2551 * have filled up the cache and not been fired into rpcs because
2552 * they don't pass the nr_pending/object threshhold */
2553 if (!list_empty(&cli->cl_cache_waiters) &&
2554 !list_empty(&cli->cl_loi_write_list))
2555 RETURN(list_entry(cli->cl_loi_write_list.next,
2556 struct lov_oinfo, loi_write_item));
2558 /* then return all queued objects when we have an invalid import
2559 * so that they get flushed */
2560 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2561 if (!list_empty(&cli->cl_loi_write_list))
2562 RETURN(list_entry(cli->cl_loi_write_list.next,
2563 struct lov_oinfo, loi_write_item));
2564 if (!list_empty(&cli->cl_loi_read_list))
2565 RETURN(list_entry(cli->cl_loi_read_list.next,
2566 struct lov_oinfo, loi_read_item));
2571 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2573 struct osc_async_page *oap;
2576 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2577 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2578 struct osc_async_page, oap_urgent_item);
2579 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2582 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2583 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2584 struct osc_async_page, oap_urgent_item);
2585 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2588 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2591 /* called with the loi list lock held */
2592 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2594 struct lov_oinfo *loi;
2595 int rc = 0, race_counter = 0;
2598 while ((loi = osc_next_loi(cli)) != NULL) {
2599 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2601 if (osc_max_rpc_in_flight(cli, loi))
2604 /* attempt some read/write balancing by alternating between
2605 * reads and writes in an object. The makes_rpc checks here
2606 * would be redundant if we were getting read/write work items
2607 * instead of objects. we don't want send_oap_rpc to drain a
2608 * partial read pending queue when we're given this object to
2609 * do io on writes while there are cache waiters */
2610 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2611 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2612 &loi->loi_write_lop);
2620 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2621 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2622 &loi->loi_read_lop);
2631 /* attempt some inter-object balancing by issueing rpcs
2632 * for each object in turn */
2633 if (!list_empty(&loi->loi_hp_ready_item))
2634 list_del_init(&loi->loi_hp_ready_item);
2635 if (!list_empty(&loi->loi_ready_item))
2636 list_del_init(&loi->loi_ready_item);
2637 if (!list_empty(&loi->loi_write_item))
2638 list_del_init(&loi->loi_write_item);
2639 if (!list_empty(&loi->loi_read_item))
2640 list_del_init(&loi->loi_read_item);
2642 loi_list_maint(cli, loi);
2644 /* send_oap_rpc fails with 0 when make_ready tells it to
2645 * back off. llite's make_ready does this when it tries
2646 * to lock a page queued for write that is already locked.
2647 * we want to try sending rpcs from many objects, but we
2648 * don't want to spin failing with 0. */
2649 if (race_counter == 10)
2655 /* we're trying to queue a page in the osc so we're subject to the
2656 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2657 * If the osc's queued pages are already at that limit, then we want to sleep
2658 * until there is space in the osc's queue for us. We also may be waiting for
2659 * write credits from the OST if there are RPCs in flight that may return some
2660 * before we fall back to sync writes.
2662 * We need this know our allocation was granted in the presence of signals */
2663 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2667 client_obd_list_lock(&cli->cl_loi_list_lock);
2668 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2669 client_obd_list_unlock(&cli->cl_loi_list_lock);
2674 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2677 int osc_enter_cache_try(const struct lu_env *env,
2678 struct client_obd *cli, struct lov_oinfo *loi,
2679 struct osc_async_page *oap, int transient)
2683 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2685 osc_consume_write_grant(cli, &oap->oap_brw_page);
2687 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2688 atomic_inc(&obd_dirty_transit_pages);
2689 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2695 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2696 * grant or cache space. */
2697 static int osc_enter_cache(const struct lu_env *env,
2698 struct client_obd *cli, struct lov_oinfo *loi,
2699 struct osc_async_page *oap)
2701 struct osc_cache_waiter ocw;
2702 struct l_wait_info lwi = { 0 };
2706 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2707 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2708 cli->cl_dirty_max, obd_max_dirty_pages,
2709 cli->cl_lost_grant, cli->cl_avail_grant);
2711 /* force the caller to try sync io. this can jump the list
2712 * of queued writes and create a discontiguous rpc stream */
2713 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2714 loi->loi_ar.ar_force_sync)
2717 /* Hopefully normal case - cache space and write credits available */
2718 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2719 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2720 osc_enter_cache_try(env, cli, loi, oap, 0))
2723 /* Make sure that there are write rpcs in flight to wait for. This
2724 * is a little silly as this object may not have any pending but
2725 * other objects sure might. */
2726 if (cli->cl_w_in_flight) {
2727 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2728 cfs_waitq_init(&ocw.ocw_waitq);
2732 loi_list_maint(cli, loi);
2733 osc_check_rpcs(env, cli);
2734 client_obd_list_unlock(&cli->cl_loi_list_lock);
2736 CDEBUG(D_CACHE, "sleeping for cache space\n");
2737 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2739 client_obd_list_lock(&cli->cl_loi_list_lock);
2740 if (!list_empty(&ocw.ocw_entry)) {
2741 list_del(&ocw.ocw_entry);
2751 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2752 struct lov_oinfo *loi, cfs_page_t *page,
2753 obd_off offset, const struct obd_async_page_ops *ops,
2754 void *data, void **res, int nocache,
2755 struct lustre_handle *lockh)
2757 struct osc_async_page *oap;
2762 return size_round(sizeof(*oap));
2765 oap->oap_magic = OAP_MAGIC;
2766 oap->oap_cli = &exp->exp_obd->u.cli;
2769 oap->oap_caller_ops = ops;
2770 oap->oap_caller_data = data;
2772 oap->oap_page = page;
2773 oap->oap_obj_off = offset;
2774 if (!client_is_remote(exp) &&
2775 cfs_capable(CFS_CAP_SYS_RESOURCE))
2776 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2778 LASSERT(!(offset & ~CFS_PAGE_MASK));
2780 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2781 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2782 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2783 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2785 spin_lock_init(&oap->oap_lock);
2786 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2790 struct osc_async_page *oap_from_cookie(void *cookie)
2792 struct osc_async_page *oap = cookie;
2793 if (oap->oap_magic != OAP_MAGIC)
2794 return ERR_PTR(-EINVAL);
2798 int osc_queue_async_io(const struct lu_env *env,
2799 struct obd_export *exp, struct lov_stripe_md *lsm,
2800 struct lov_oinfo *loi, void *cookie,
2801 int cmd, obd_off off, int count,
2802 obd_flag brw_flags, enum async_flags async_flags)
2804 struct client_obd *cli = &exp->exp_obd->u.cli;
2805 struct osc_async_page *oap;
2809 oap = oap_from_cookie(cookie);
2811 RETURN(PTR_ERR(oap));
2813 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2816 if (!list_empty(&oap->oap_pending_item) ||
2817 !list_empty(&oap->oap_urgent_item) ||
2818 !list_empty(&oap->oap_rpc_item))
2821 /* check if the file's owner/group is over quota */
2822 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2823 struct cl_object *obj;
2824 struct cl_attr attr; /* XXX put attr into thread info */
2826 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2828 cl_object_attr_lock(obj);
2829 rc = cl_object_attr_get(env, obj, &attr);
2830 cl_object_attr_unlock(obj);
2832 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2833 attr.cat_gid) == NO_QUOTA)
2840 loi = lsm->lsm_oinfo[0];
2842 client_obd_list_lock(&cli->cl_loi_list_lock);
2844 LASSERT(off + count <= CFS_PAGE_SIZE);
2846 oap->oap_page_off = off;
2847 oap->oap_count = count;
2848 oap->oap_brw_flags = brw_flags;
2849 oap->oap_async_flags = async_flags;
2851 if (cmd & OBD_BRW_WRITE) {
2852 rc = osc_enter_cache(env, cli, loi, oap);
2854 client_obd_list_unlock(&cli->cl_loi_list_lock);
2859 osc_oap_to_pending(oap);
2860 loi_list_maint(cli, loi);
2862 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2865 osc_check_rpcs(env, cli);
2866 client_obd_list_unlock(&cli->cl_loi_list_lock);
2871 /* aka (~was & now & flag), but this is more clear :) */
2872 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2874 int osc_set_async_flags_base(struct client_obd *cli,
2875 struct lov_oinfo *loi, struct osc_async_page *oap,
2876 obd_flag async_flags)
2878 struct loi_oap_pages *lop;
2881 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2884 if (oap->oap_cmd & OBD_BRW_WRITE) {
2885 lop = &loi->loi_write_lop;
2887 lop = &loi->loi_read_lop;
2890 if (list_empty(&oap->oap_pending_item))
2893 if ((oap->oap_async_flags & async_flags) == async_flags)
2896 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2897 oap->oap_async_flags |= ASYNC_READY;
2899 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2900 list_empty(&oap->oap_rpc_item)) {
2901 if (oap->oap_async_flags & ASYNC_HP)
2902 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2904 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2905 oap->oap_async_flags |= ASYNC_URGENT;
2906 loi_list_maint(cli, loi);
2909 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2910 oap->oap_async_flags);
2914 int osc_teardown_async_page(struct obd_export *exp,
2915 struct lov_stripe_md *lsm,
2916 struct lov_oinfo *loi, void *cookie)
2918 struct client_obd *cli = &exp->exp_obd->u.cli;
2919 struct loi_oap_pages *lop;
2920 struct osc_async_page *oap;
2924 oap = oap_from_cookie(cookie);
2926 RETURN(PTR_ERR(oap));
2929 loi = lsm->lsm_oinfo[0];
2931 if (oap->oap_cmd & OBD_BRW_WRITE) {
2932 lop = &loi->loi_write_lop;
2934 lop = &loi->loi_read_lop;
2937 client_obd_list_lock(&cli->cl_loi_list_lock);
2939 if (!list_empty(&oap->oap_rpc_item))
2940 GOTO(out, rc = -EBUSY);
2942 osc_exit_cache(cli, oap, 0);
2943 osc_wake_cache_waiters(cli);
2945 if (!list_empty(&oap->oap_urgent_item)) {
2946 list_del_init(&oap->oap_urgent_item);
2947 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2949 if (!list_empty(&oap->oap_pending_item)) {
2950 list_del_init(&oap->oap_pending_item);
2951 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2953 loi_list_maint(cli, loi);
2954 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2956 client_obd_list_unlock(&cli->cl_loi_list_lock);
2960 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2961 struct ldlm_enqueue_info *einfo,
2964 void *data = einfo->ei_cbdata;
2966 LASSERT(lock != NULL);
2967 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2968 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2969 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2970 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2972 lock_res_and_lock(lock);
2973 spin_lock(&osc_ast_guard);
2974 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2975 lock->l_ast_data = data;
2976 spin_unlock(&osc_ast_guard);
2977 unlock_res_and_lock(lock);
2980 static void osc_set_data_with_check(struct lustre_handle *lockh,
2981 struct ldlm_enqueue_info *einfo,
2984 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2987 osc_set_lock_data_with_check(lock, einfo, flags);
2988 LDLM_LOCK_PUT(lock);
2990 CERROR("lockh %p, data %p - client evicted?\n",
2991 lockh, einfo->ei_cbdata);
2994 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2995 ldlm_iterator_t replace, void *data)
2997 struct ldlm_res_id res_id;
2998 struct obd_device *obd = class_exp2obd(exp);
3000 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3001 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3005 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3006 obd_enqueue_update_f upcall, void *cookie,
3009 int intent = *flags & LDLM_FL_HAS_INTENT;
3013 /* The request was created before ldlm_cli_enqueue call. */
3014 if (rc == ELDLM_LOCK_ABORTED) {
3015 struct ldlm_reply *rep;
3016 rep = req_capsule_server_get(&req->rq_pill,
3019 LASSERT(rep != NULL);
3020 if (rep->lock_policy_res1)
3021 rc = rep->lock_policy_res1;
3025 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3026 *flags |= LDLM_FL_LVB_READY;
3027 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3028 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3031 /* Call the update callback. */
3032 rc = (*upcall)(cookie, rc);
3036 static int osc_enqueue_interpret(const struct lu_env *env,
3037 struct ptlrpc_request *req,
3038 struct osc_enqueue_args *aa, int rc)
3040 struct ldlm_lock *lock;
3041 struct lustre_handle handle;
3044 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3045 * might be freed anytime after lock upcall has been called. */
3046 lustre_handle_copy(&handle, aa->oa_lockh);
3047 mode = aa->oa_ei->ei_mode;
3049 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3051 lock = ldlm_handle2lock(&handle);
3053 /* Take an additional reference so that a blocking AST that
3054 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3055 * to arrive after an upcall has been executed by
3056 * osc_enqueue_fini(). */
3057 ldlm_lock_addref(&handle, mode);
3059 /* Complete obtaining the lock procedure. */
3060 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3061 mode, aa->oa_flags, aa->oa_lvb,
3062 sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3064 /* Complete osc stuff. */
3065 rc = osc_enqueue_fini(req, aa->oa_lvb,
3066 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3067 /* Release the lock for async request. */
3068 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3070 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3071 * not already released by
3072 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3074 ldlm_lock_decref(&handle, mode);
3076 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3077 aa->oa_lockh, req, aa);
3078 ldlm_lock_decref(&handle, mode);
3079 LDLM_LOCK_PUT(lock);
3083 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3084 struct lov_oinfo *loi, int flags,
3085 struct ost_lvb *lvb, __u32 mode, int rc)
3087 if (rc == ELDLM_OK) {
3088 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3091 LASSERT(lock != NULL);
3092 loi->loi_lvb = *lvb;
3093 tmp = loi->loi_lvb.lvb_size;
3094 /* Extend KMS up to the end of this lock and no further
3095 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3096 if (tmp > lock->l_policy_data.l_extent.end)
3097 tmp = lock->l_policy_data.l_extent.end + 1;
3098 if (tmp >= loi->loi_kms) {
3099 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3100 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3101 loi_kms_set(loi, tmp);
3103 LDLM_DEBUG(lock, "lock acquired, setting rss="
3104 LPU64"; leaving kms="LPU64", end="LPU64,
3105 loi->loi_lvb.lvb_size, loi->loi_kms,
3106 lock->l_policy_data.l_extent.end);
3108 ldlm_lock_allow_match(lock);
3109 LDLM_LOCK_PUT(lock);
3110 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3111 loi->loi_lvb = *lvb;
3112 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3113 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3117 EXPORT_SYMBOL(osc_update_enqueue);
3119 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3121 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3122 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3123 * other synchronous requests, however keeping some locks and trying to obtain
3124 * others may take a considerable amount of time in a case of ost failure; and
3125 * when other sync requests do not get released lock from a client, the client
3126 * is excluded from the cluster -- such scenarious make the life difficult, so
3127 * release locks just after they are obtained. */
3128 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3129 int *flags, ldlm_policy_data_t *policy,
3130 struct ost_lvb *lvb, int kms_valid,
3131 obd_enqueue_update_f upcall, void *cookie,
3132 struct ldlm_enqueue_info *einfo,
3133 struct lustre_handle *lockh,
3134 struct ptlrpc_request_set *rqset, int async)
3136 struct obd_device *obd = exp->exp_obd;
3137 struct ptlrpc_request *req = NULL;
3138 int intent = *flags & LDLM_FL_HAS_INTENT;
3143 /* Filesystem lock extents are extended to page boundaries so that
3144 * dealing with the page cache is a little smoother. */
3145 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3146 policy->l_extent.end |= ~CFS_PAGE_MASK;
3149 * kms is not valid when either object is completely fresh (so that no
3150 * locks are cached), or object was evicted. In the latter case cached
3151 * lock cannot be used, because it would prime inode state with
3152 * potentially stale LVB.
3157 /* Next, search for already existing extent locks that will cover us */
3158 /* If we're trying to read, we also search for an existing PW lock. The
3159 * VFS and page cache already protect us locally, so lots of readers/
3160 * writers can share a single PW lock.
3162 * There are problems with conversion deadlocks, so instead of
3163 * converting a read lock to a write lock, we'll just enqueue a new
3166 * At some point we should cancel the read lock instead of making them
3167 * send us a blocking callback, but there are problems with canceling
3168 * locks out from other users right now, too. */
3169 mode = einfo->ei_mode;
3170 if (einfo->ei_mode == LCK_PR)
3172 mode = ldlm_lock_match(obd->obd_namespace,
3173 *flags | LDLM_FL_LVB_READY, res_id,
3174 einfo->ei_type, policy, mode, lockh, 0);
3176 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3178 if (matched->l_ast_data == NULL ||
3179 matched->l_ast_data == einfo->ei_cbdata) {
3180 /* addref the lock only if not async requests and PW
3181 * lock is matched whereas we asked for PR. */
3182 if (!rqset && einfo->ei_mode != mode)
3183 ldlm_lock_addref(lockh, LCK_PR);
3184 osc_set_lock_data_with_check(matched, einfo, *flags);
3186 /* I would like to be able to ASSERT here that
3187 * rss <= kms, but I can't, for reasons which
3188 * are explained in lov_enqueue() */
3191 /* We already have a lock, and it's referenced */
3192 (*upcall)(cookie, ELDLM_OK);
3194 /* For async requests, decref the lock. */
3195 if (einfo->ei_mode != mode)
3196 ldlm_lock_decref(lockh, LCK_PW);
3198 ldlm_lock_decref(lockh, einfo->ei_mode);
3199 LDLM_LOCK_PUT(matched);
3202 ldlm_lock_decref(lockh, mode);
3203 LDLM_LOCK_PUT(matched);
3208 CFS_LIST_HEAD(cancels);
3209 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3210 &RQF_LDLM_ENQUEUE_LVB);
3214 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3218 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3220 ptlrpc_request_set_replen(req);
3223 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3224 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3226 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3227 sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3230 struct osc_enqueue_args *aa;
3231 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3232 aa = ptlrpc_req_async_args(req);
3235 aa->oa_flags = flags;
3236 aa->oa_upcall = upcall;
3237 aa->oa_cookie = cookie;
3239 aa->oa_lockh = lockh;
3241 req->rq_interpret_reply =
3242 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3243 if (rqset == PTLRPCD_SET)
3244 ptlrpcd_add_req(req, PSCOPE_OTHER);
3246 ptlrpc_set_add_req(rqset, req);
3247 } else if (intent) {
3248 ptlrpc_req_finished(req);
3253 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3255 ptlrpc_req_finished(req);
3260 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3261 struct ldlm_enqueue_info *einfo,
3262 struct ptlrpc_request_set *rqset)
3264 struct ldlm_res_id res_id;
3268 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3269 oinfo->oi_md->lsm_object_gr, &res_id);
3271 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3272 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3273 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3274 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3275 rqset, rqset != NULL);
3279 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3280 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3281 int *flags, void *data, struct lustre_handle *lockh,
3284 struct obd_device *obd = exp->exp_obd;
3285 int lflags = *flags;
3289 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3292 /* Filesystem lock extents are extended to page boundaries so that
3293 * dealing with the page cache is a little smoother */
3294 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3295 policy->l_extent.end |= ~CFS_PAGE_MASK;
3297 /* Next, search for already existing extent locks that will cover us */
3298 /* If we're trying to read, we also search for an existing PW lock. The
3299 * VFS and page cache already protect us locally, so lots of readers/
3300 * writers can share a single PW lock. */
3304 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3305 res_id, type, policy, rc, lockh, unref);
3308 osc_set_data_with_check(lockh, data, lflags);
3309 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3310 ldlm_lock_addref(lockh, LCK_PR);
3311 ldlm_lock_decref(lockh, LCK_PW);
3318 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3322 if (unlikely(mode == LCK_GROUP))
3323 ldlm_lock_decref_and_cancel(lockh, mode);
3325 ldlm_lock_decref(lockh, mode);
3330 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3331 __u32 mode, struct lustre_handle *lockh)
3334 RETURN(osc_cancel_base(lockh, mode));
3337 static int osc_cancel_unused(struct obd_export *exp,
3338 struct lov_stripe_md *lsm, int flags,
3341 struct obd_device *obd = class_exp2obd(exp);
3342 struct ldlm_res_id res_id, *resp = NULL;
3345 resp = osc_build_res_name(lsm->lsm_object_id,
3346 lsm->lsm_object_gr, &res_id);
3349 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3352 static int osc_statfs_interpret(const struct lu_env *env,
3353 struct ptlrpc_request *req,
3354 struct osc_async_args *aa, int rc)
3356 struct obd_statfs *msfs;
3362 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3364 GOTO(out, rc = -EPROTO);
3367 *aa->aa_oi->oi_osfs = *msfs;
3369 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3373 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3374 __u64 max_age, struct ptlrpc_request_set *rqset)
3376 struct ptlrpc_request *req;
3377 struct osc_async_args *aa;
3381 /* We could possibly pass max_age in the request (as an absolute
3382 * timestamp or a "seconds.usec ago") so the target can avoid doing
3383 * extra calls into the filesystem if that isn't necessary (e.g.
3384 * during mount that would help a bit). Having relative timestamps
3385 * is not so great if request processing is slow, while absolute
3386 * timestamps are not ideal because they need time synchronization. */
3387 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3391 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3393 ptlrpc_request_free(req);
3396 ptlrpc_request_set_replen(req);
3397 req->rq_request_portal = OST_CREATE_PORTAL;
3398 ptlrpc_at_set_req_timeout(req);
3400 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3401 /* procfs requests not want stat in wait for avoid deadlock */
3402 req->rq_no_resend = 1;
3403 req->rq_no_delay = 1;
3406 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3407 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3408 aa = ptlrpc_req_async_args(req);
3411 ptlrpc_set_add_req(rqset, req);
3415 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3416 __u64 max_age, __u32 flags)
3418 struct obd_statfs *msfs;
3419 struct ptlrpc_request *req;
3420 struct obd_import *imp = NULL;
3424 /*Since the request might also come from lprocfs, so we need
3425 *sync this with client_disconnect_export Bug15684*/
3426 down_read(&obd->u.cli.cl_sem);
3427 if (obd->u.cli.cl_import)
3428 imp = class_import_get(obd->u.cli.cl_import);
3429 up_read(&obd->u.cli.cl_sem);
3433 /* We could possibly pass max_age in the request (as an absolute
3434 * timestamp or a "seconds.usec ago") so the target can avoid doing
3435 * extra calls into the filesystem if that isn't necessary (e.g.
3436 * during mount that would help a bit). Having relative timestamps
3437 * is not so great if request processing is slow, while absolute
3438 * timestamps are not ideal because they need time synchronization. */
3439 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3441 class_import_put(imp);
3446 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3448 ptlrpc_request_free(req);
3451 ptlrpc_request_set_replen(req);
3452 req->rq_request_portal = OST_CREATE_PORTAL;
3453 ptlrpc_at_set_req_timeout(req);
3455 if (flags & OBD_STATFS_NODELAY) {
3456 /* procfs requests not want stat in wait for avoid deadlock */
3457 req->rq_no_resend = 1;
3458 req->rq_no_delay = 1;
3461 rc = ptlrpc_queue_wait(req);
3465 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3467 GOTO(out, rc = -EPROTO);
3474 ptlrpc_req_finished(req);
3478 /* Retrieve object striping information.
3480 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3481 * the maximum number of OST indices which will fit in the user buffer.
3482 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3484 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3486 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3487 struct lov_user_md_v3 lum, *lumk;
3488 struct lov_user_ost_data_v1 *lmm_objects;
3489 int rc = 0, lum_size;
3495 /* we only need the header part from user space to get lmm_magic and
3496 * lmm_stripe_count, (the header part is common to v1 and v3) */
3497 lum_size = sizeof(struct lov_user_md_v1);
3498 if (copy_from_user(&lum, lump, lum_size))
3501 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3502 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3505 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3506 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3507 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3508 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3510 /* we can use lov_mds_md_size() to compute lum_size
3511 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3512 if (lum.lmm_stripe_count > 0) {
3513 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3514 OBD_ALLOC(lumk, lum_size);
3518 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3519 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3521 lmm_objects = &(lumk->lmm_objects[0]);
3522 lmm_objects->l_object_id = lsm->lsm_object_id;
3524 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3528 lumk->lmm_object_id = lsm->lsm_object_id;
3529 lumk->lmm_object_gr = lsm->lsm_object_gr;
3530 lumk->lmm_stripe_count = 1;
3532 if (copy_to_user(lump, lumk, lum_size))
3536 OBD_FREE(lumk, lum_size);
3542 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3543 void *karg, void *uarg)
3545 struct obd_device *obd = exp->exp_obd;
3546 struct obd_ioctl_data *data = karg;
3550 if (!try_module_get(THIS_MODULE)) {
3551 CERROR("Can't get module. Is it alive?");
3555 case OBD_IOC_LOV_GET_CONFIG: {
3557 struct lov_desc *desc;
3558 struct obd_uuid uuid;
3562 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3563 GOTO(out, err = -EINVAL);
3565 data = (struct obd_ioctl_data *)buf;
3567 if (sizeof(*desc) > data->ioc_inllen1) {
3568 obd_ioctl_freedata(buf, len);
3569 GOTO(out, err = -EINVAL);
3572 if (data->ioc_inllen2 < sizeof(uuid)) {
3573 obd_ioctl_freedata(buf, len);
3574 GOTO(out, err = -EINVAL);
3577 desc = (struct lov_desc *)data->ioc_inlbuf1;
3578 desc->ld_tgt_count = 1;
3579 desc->ld_active_tgt_count = 1;
3580 desc->ld_default_stripe_count = 1;
3581 desc->ld_default_stripe_size = 0;
3582 desc->ld_default_stripe_offset = 0;
3583 desc->ld_pattern = 0;
3584 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3586 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3588 err = copy_to_user((void *)uarg, buf, len);
3591 obd_ioctl_freedata(buf, len);
3594 case LL_IOC_LOV_SETSTRIPE:
3595 err = obd_alloc_memmd(exp, karg);
3599 case LL_IOC_LOV_GETSTRIPE:
3600 err = osc_getstripe(karg, uarg);
3602 case OBD_IOC_CLIENT_RECOVER:
3603 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3608 case IOC_OSC_SET_ACTIVE:
3609 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3612 case OBD_IOC_POLL_QUOTACHECK:
3613 err = lquota_poll_check(quota_interface, exp,
3614 (struct if_quotacheck *)karg);
3616 case OBD_IOC_PING_TARGET:
3617 err = ptlrpc_obd_ping(obd);
3620 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3621 cmd, cfs_curproc_comm());
3622 GOTO(out, err = -ENOTTY);
3625 module_put(THIS_MODULE);
3629 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3630 void *key, __u32 *vallen, void *val,
3631 struct lov_stripe_md *lsm)
3634 if (!vallen || !val)
3637 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3638 __u32 *stripe = val;
3639 *vallen = sizeof(*stripe);
3642 } else if (KEY_IS(KEY_LAST_ID)) {
3643 struct ptlrpc_request *req;
3648 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3649 &RQF_OST_GET_INFO_LAST_ID);
3653 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3654 RCL_CLIENT, keylen);
3655 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3657 ptlrpc_request_free(req);
3661 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3662 memcpy(tmp, key, keylen);
3664 ptlrpc_request_set_replen(req);
3665 rc = ptlrpc_queue_wait(req);
3669 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3671 GOTO(out, rc = -EPROTO);
3673 *((obd_id *)val) = *reply;
3675 ptlrpc_req_finished(req);
3677 } else if (KEY_IS(KEY_FIEMAP)) {
3678 struct ptlrpc_request *req;
3679 struct ll_user_fiemap *reply;
3683 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3684 &RQF_OST_GET_INFO_FIEMAP);
3688 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3689 RCL_CLIENT, keylen);
3690 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3691 RCL_CLIENT, *vallen);
3692 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3693 RCL_SERVER, *vallen);
3695 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3697 ptlrpc_request_free(req);
3701 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3702 memcpy(tmp, key, keylen);
3703 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3704 memcpy(tmp, val, *vallen);
3706 ptlrpc_request_set_replen(req);
3707 rc = ptlrpc_queue_wait(req);
3711 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3713 GOTO(out1, rc = -EPROTO);
3715 memcpy(val, reply, *vallen);
3717 ptlrpc_req_finished(req);
3725 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3726 struct ptlrpc_request *req,
3729 struct llog_ctxt *ctxt;
3730 struct obd_import *imp = req->rq_import;
3736 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3739 rc = llog_initiator_connect(ctxt);
3741 CERROR("cannot establish connection for "
3742 "ctxt %p: %d\n", ctxt, rc);
3745 llog_ctxt_put(ctxt);
3746 spin_lock(&imp->imp_lock);
3747 imp->imp_server_timeout = 1;
3748 imp->imp_pingable = 1;
3749 spin_unlock(&imp->imp_lock);
3750 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3755 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3756 void *key, obd_count vallen, void *val,
3757 struct ptlrpc_request_set *set)
3759 struct ptlrpc_request *req;
3760 struct obd_device *obd = exp->exp_obd;
3761 struct obd_import *imp = class_exp2cliimp(exp);
3766 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3768 if (KEY_IS(KEY_NEXT_ID)) {
3769 if (vallen != sizeof(obd_id))
3773 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3774 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3775 exp->exp_obd->obd_name,
3776 obd->u.cli.cl_oscc.oscc_next_id);
3781 if (KEY_IS(KEY_UNLINKED)) {
3782 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3783 spin_lock(&oscc->oscc_lock);
3784 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3785 spin_unlock(&oscc->oscc_lock);
3789 if (KEY_IS(KEY_INIT_RECOV)) {
3790 if (vallen != sizeof(int))
3792 spin_lock(&imp->imp_lock);
3793 imp->imp_initial_recov = *(int *)val;
3794 spin_unlock(&imp->imp_lock);
3795 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3796 exp->exp_obd->obd_name,
3797 imp->imp_initial_recov);
3801 if (KEY_IS(KEY_CHECKSUM)) {
3802 if (vallen != sizeof(int))
3804 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3808 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3809 sptlrpc_conf_client_adapt(obd);
3813 if (KEY_IS(KEY_FLUSH_CTX)) {
3814 sptlrpc_import_flush_my_ctx(imp);
3818 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3821 /* We pass all other commands directly to OST. Since nobody calls osc
3822 methods directly and everybody is supposed to go through LOV, we
3823 assume lov checked invalid values for us.
3824 The only recognised values so far are evict_by_nid and mds_conn.
3825 Even if something bad goes through, we'd get a -EINVAL from OST
3828 if (KEY_IS(KEY_GRANT_SHRINK))
3829 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3831 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3836 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3837 RCL_CLIENT, keylen);
3838 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3839 RCL_CLIENT, vallen);
3840 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3842 ptlrpc_request_free(req);
3846 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3847 memcpy(tmp, key, keylen);
3848 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3849 memcpy(tmp, val, vallen);
3851 if (KEY_IS(KEY_MDS_CONN)) {
3852 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3854 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3855 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3856 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3857 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3858 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3859 struct osc_grant_args *aa;
3862 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3863 aa = ptlrpc_req_async_args(req);
3866 ptlrpc_req_finished(req);
3869 *oa = ((struct ost_body *)val)->oa;
3871 req->rq_interpret_reply = osc_shrink_grant_interpret;
3874 ptlrpc_request_set_replen(req);
3875 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3876 LASSERT(set != NULL);
3877 ptlrpc_set_add_req(set, req);
3878 ptlrpc_check_set(NULL, set);
3880 ptlrpcd_add_req(req, PSCOPE_OTHER);
3886 static struct llog_operations osc_size_repl_logops = {
3887 lop_cancel: llog_obd_repl_cancel
3890 static struct llog_operations osc_mds_ost_orig_logops;
3891 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3892 struct obd_device *tgt, int count,
3893 struct llog_catid *catid, struct obd_uuid *uuid)
3898 LASSERT(olg == &obd->obd_olg);
3899 spin_lock(&obd->obd_dev_lock);
3900 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3901 osc_mds_ost_orig_logops = llog_lvfs_ops;
3902 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3903 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3904 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3905 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3907 spin_unlock(&obd->obd_dev_lock);
3909 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3910 &catid->lci_logid, &osc_mds_ost_orig_logops);
3912 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3916 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3917 NULL, &osc_size_repl_logops);
3919 struct llog_ctxt *ctxt =
3920 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3923 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3928 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3929 obd->obd_name, tgt->obd_name, count, catid, rc);
3930 CERROR("logid "LPX64":0x%x\n",
3931 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3936 static int osc_llog_finish(struct obd_device *obd, int count)
3938 struct llog_ctxt *ctxt;
3939 int rc = 0, rc2 = 0;
3942 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3944 rc = llog_cleanup(ctxt);
3946 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3948 rc2 = llog_cleanup(ctxt);
3955 static int osc_reconnect(const struct lu_env *env,
3956 struct obd_export *exp, struct obd_device *obd,
3957 struct obd_uuid *cluuid,
3958 struct obd_connect_data *data,
3961 struct client_obd *cli = &obd->u.cli;
3963 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3966 client_obd_list_lock(&cli->cl_loi_list_lock);
3967 data->ocd_grant = cli->cl_avail_grant ?:
3968 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3969 lost_grant = cli->cl_lost_grant;
3970 cli->cl_lost_grant = 0;
3971 client_obd_list_unlock(&cli->cl_loi_list_lock);
3973 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3974 "cl_lost_grant: %ld\n", data->ocd_grant,
3975 cli->cl_avail_grant, lost_grant);
3976 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3977 " ocd_grant: %d\n", data->ocd_connect_flags,
3978 data->ocd_version, data->ocd_grant);
3984 static int osc_disconnect(struct obd_export *exp)
3986 struct obd_device *obd = class_exp2obd(exp);
3987 struct llog_ctxt *ctxt;
3990 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3992 if (obd->u.cli.cl_conn_count == 1) {
3993 /* Flush any remaining cancel messages out to the
3995 llog_sync(ctxt, exp);
3997 llog_ctxt_put(ctxt);
3999 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4003 rc = client_disconnect_export(exp);
4005 * Initially we put del_shrink_grant before disconnect_export, but it
4006 * causes the following problem if setup (connect) and cleanup
4007 * (disconnect) are tangled together.
4008 * connect p1 disconnect p2
4009 * ptlrpc_connect_import
4010 * ............... class_manual_cleanup
4013 * ptlrpc_connect_interrupt
4015 * add this client to shrink list
4017 * Bang! pinger trigger the shrink.
4018 * So the osc should be disconnected from the shrink list, after we
4019 * are sure the import has been destroyed. BUG18662
4021 if (obd->u.cli.cl_import == NULL)
4022 osc_del_shrink_grant(&obd->u.cli);
4026 static int osc_import_event(struct obd_device *obd,
4027 struct obd_import *imp,
4028 enum obd_import_event event)
4030 struct client_obd *cli;
4034 LASSERT(imp->imp_obd == obd);
4037 case IMP_EVENT_DISCON: {
4038 /* Only do this on the MDS OSC's */
4039 if (imp->imp_server_timeout) {
4040 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4042 spin_lock(&oscc->oscc_lock);
4043 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4044 spin_unlock(&oscc->oscc_lock);
4047 client_obd_list_lock(&cli->cl_loi_list_lock);
4048 cli->cl_avail_grant = 0;
4049 cli->cl_lost_grant = 0;
4050 client_obd_list_unlock(&cli->cl_loi_list_lock);
4053 case IMP_EVENT_INACTIVE: {
4054 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4057 case IMP_EVENT_INVALIDATE: {
4058 struct ldlm_namespace *ns = obd->obd_namespace;
4062 env = cl_env_get(&refcheck);
4066 client_obd_list_lock(&cli->cl_loi_list_lock);
4067 /* all pages go to failing rpcs due to the invalid
4069 osc_check_rpcs(env, cli);
4070 client_obd_list_unlock(&cli->cl_loi_list_lock);
4072 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4073 cl_env_put(env, &refcheck);
4078 case IMP_EVENT_ACTIVE: {
4079 /* Only do this on the MDS OSC's */
4080 if (imp->imp_server_timeout) {
4081 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4083 spin_lock(&oscc->oscc_lock);
4084 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4085 spin_unlock(&oscc->oscc_lock);
4087 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4090 case IMP_EVENT_OCD: {
4091 struct obd_connect_data *ocd = &imp->imp_connect_data;
4093 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4094 osc_init_grant(&obd->u.cli, ocd);
4097 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4098 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4100 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4104 CERROR("Unknown import event %d\n", event);
4110 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4116 rc = ptlrpcd_addref();
4120 rc = client_obd_setup(obd, lcfg);
4124 struct lprocfs_static_vars lvars = { 0 };
4125 struct client_obd *cli = &obd->u.cli;
4127 lprocfs_osc_init_vars(&lvars);
4128 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4129 lproc_osc_attach_seqstat(obd);
4130 sptlrpc_lprocfs_cliobd_attach(obd);
4131 ptlrpc_lprocfs_register_obd(obd);
4135 /* We need to allocate a few requests more, because
4136 brw_interpret tries to create new requests before freeing
4137 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4138 reserved, but I afraid that might be too much wasted RAM
4139 in fact, so 2 is just my guess and still should work. */
4140 cli->cl_import->imp_rq_pool =
4141 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4143 ptlrpc_add_rqs_to_pool);
4145 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4146 sema_init(&cli->cl_grant_sem, 1);
4152 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4158 case OBD_CLEANUP_EARLY: {
4159 struct obd_import *imp;
4160 imp = obd->u.cli.cl_import;
4161 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4162 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4163 ptlrpc_deactivate_import(imp);
4164 spin_lock(&imp->imp_lock);
4165 imp->imp_pingable = 0;
4166 spin_unlock(&imp->imp_lock);
4169 case OBD_CLEANUP_EXPORTS: {
4170 /* If we set up but never connected, the
4171 client import will not have been cleaned. */
4172 if (obd->u.cli.cl_import) {
4173 struct obd_import *imp;
4174 down_write(&obd->u.cli.cl_sem);
4175 imp = obd->u.cli.cl_import;
4176 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4178 ptlrpc_invalidate_import(imp);
4179 if (imp->imp_rq_pool) {
4180 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4181 imp->imp_rq_pool = NULL;
4183 class_destroy_import(imp);
4184 up_write(&obd->u.cli.cl_sem);
4185 obd->u.cli.cl_import = NULL;
4187 rc = obd_llog_finish(obd, 0);
4189 CERROR("failed to cleanup llogging subsystems\n");
4196 int osc_cleanup(struct obd_device *obd)
4198 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4202 ptlrpc_lprocfs_unregister_obd(obd);
4203 lprocfs_obd_cleanup(obd);
4205 spin_lock(&oscc->oscc_lock);
4206 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4207 oscc->oscc_flags |= OSCC_FLAG_EXITING;
4208 spin_unlock(&oscc->oscc_lock);
4210 /* free memory of osc quota cache */
4211 lquota_cleanup(quota_interface, obd);
4213 rc = client_obd_cleanup(obd);
4219 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4221 struct lprocfs_static_vars lvars = { 0 };
4224 lprocfs_osc_init_vars(&lvars);
4226 switch (lcfg->lcfg_command) {
4228 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4238 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4240 return osc_process_config_base(obd, buf);
4243 struct obd_ops osc_obd_ops = {
4244 .o_owner = THIS_MODULE,
4245 .o_setup = osc_setup,
4246 .o_precleanup = osc_precleanup,
4247 .o_cleanup = osc_cleanup,
4248 .o_add_conn = client_import_add_conn,
4249 .o_del_conn = client_import_del_conn,
4250 .o_connect = client_connect_import,
4251 .o_reconnect = osc_reconnect,
4252 .o_disconnect = osc_disconnect,
4253 .o_statfs = osc_statfs,
4254 .o_statfs_async = osc_statfs_async,
4255 .o_packmd = osc_packmd,
4256 .o_unpackmd = osc_unpackmd,
4257 .o_precreate = osc_precreate,
4258 .o_create = osc_create,
4259 .o_destroy = osc_destroy,
4260 .o_getattr = osc_getattr,
4261 .o_getattr_async = osc_getattr_async,
4262 .o_setattr = osc_setattr,
4263 .o_setattr_async = osc_setattr_async,
4265 .o_punch = osc_punch,
4267 .o_enqueue = osc_enqueue,
4268 .o_change_cbdata = osc_change_cbdata,
4269 .o_cancel = osc_cancel,
4270 .o_cancel_unused = osc_cancel_unused,
4271 .o_iocontrol = osc_iocontrol,
4272 .o_get_info = osc_get_info,
4273 .o_set_info_async = osc_set_info_async,
4274 .o_import_event = osc_import_event,
4275 .o_llog_init = osc_llog_init,
4276 .o_llog_finish = osc_llog_finish,
4277 .o_process_config = osc_process_config,
4280 extern struct lu_kmem_descr osc_caches[];
4281 extern spinlock_t osc_ast_guard;
4282 extern struct lock_class_key osc_ast_guard_class;
4284 int __init osc_init(void)
4286 struct lprocfs_static_vars lvars = { 0 };
4290 /* print an address of _any_ initialized kernel symbol from this
4291 * module, to allow debugging with gdb that doesn't support data
4292 * symbols from modules.*/
4293 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4295 rc = lu_kmem_init(osc_caches);
4297 lprocfs_osc_init_vars(&lvars);
4299 request_module("lquota");
4300 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4301 lquota_init(quota_interface);
4302 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4304 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4305 LUSTRE_OSC_NAME, &osc_device_type);
4307 if (quota_interface)
4308 PORTAL_SYMBOL_PUT(osc_quota_interface);
4309 lu_kmem_fini(osc_caches);
4313 spin_lock_init(&osc_ast_guard);
4314 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4320 static void /*__exit*/ osc_exit(void)
4322 lu_device_type_fini(&osc_device_type);
4324 lquota_exit(quota_interface);
4325 if (quota_interface)
4326 PORTAL_SYMBOL_PUT(osc_quota_interface);
4328 class_unregister_type(LUSTRE_OSC_NAME);
4329 lu_kmem_fini(osc_caches);
4332 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4333 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4334 MODULE_LICENSE("GPL");
4336 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);