1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 body->oa = *oinfo->oi_oa;
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214 lustre_swab_ost_body);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 *oinfo->oi_oa = body->oa;
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308 struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316 CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317 "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318 oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327 ptlrpc_request_free(req);
331 osc_pack_req_body(req, oinfo);
333 ptlrpc_request_set_replen(req);
335 rc = ptlrpc_queue_wait(req);
339 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341 GOTO(out, rc = -EPROTO);
343 *oinfo->oi_oa = body->oa;
347 ptlrpc_req_finished(req);
351 static int osc_setattr_interpret(const struct lu_env *env,
352 struct ptlrpc_request *req,
353 struct osc_async_args *aa, int rc)
355 struct ost_body *body;
361 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363 GOTO(out, rc = -EPROTO);
365 *aa->aa_oi->oi_oa = body->oa;
367 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372 struct obd_trans_info *oti,
373 struct ptlrpc_request_set *rqset)
375 struct ptlrpc_request *req;
376 struct osc_async_args *aa;
380 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387 ptlrpc_request_free(req);
391 osc_pack_req_body(req, oinfo);
393 ptlrpc_request_set_replen(req);
395 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
400 /* do mds to ost setattr asynchronously */
402 /* Do not wait for response. */
403 ptlrpcd_add_req(req, PSCOPE_OTHER);
405 req->rq_interpret_reply =
406 (ptlrpc_interpterer_t)osc_setattr_interpret;
408 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409 aa = ptlrpc_req_async_args(req);
412 ptlrpc_set_add_req(rqset, req);
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419 struct lov_stripe_md **ea, struct obd_trans_info *oti)
421 struct ptlrpc_request *req;
422 struct ost_body *body;
423 struct lov_stripe_md *lsm;
432 rc = obd_alloc_memmd(exp, &lsm);
437 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
439 GOTO(out, rc = -ENOMEM);
441 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
443 ptlrpc_request_free(req);
447 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
451 ptlrpc_request_set_replen(req);
453 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454 oa->o_flags == OBD_FL_DELORPHAN) {
456 "delorphan from OST integration");
457 /* Don't resend the delorphan req */
458 req->rq_no_resend = req->rq_no_delay = 1;
461 rc = ptlrpc_queue_wait(req);
465 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
467 GOTO(out_req, rc = -EPROTO);
471 /* This should really be sent by the OST */
472 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473 oa->o_valid |= OBD_MD_FLBLKSZ;
475 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476 * have valid lsm_oinfo data structs, so don't go touching that.
477 * This needs to be fixed in a big way.
479 lsm->lsm_object_id = oa->o_id;
480 lsm->lsm_object_gr = oa->o_gr;
484 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
486 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487 if (!oti->oti_logcookies)
488 oti_alloc_cookies(oti, 1);
489 *oti->oti_logcookies = oa->o_lcookie;
493 CDEBUG(D_HA, "transno: "LPD64"\n",
494 lustre_msg_get_transno(req->rq_repmsg));
496 ptlrpc_req_finished(req);
499 obd_free_memmd(exp, &lsm);
503 static int osc_punch_interpret(const struct lu_env *env,
504 struct ptlrpc_request *req,
505 struct osc_punch_args *aa, int rc)
507 struct ost_body *body;
513 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
515 GOTO(out, rc = -EPROTO);
517 *aa->pa_oa = body->oa;
519 rc = aa->pa_upcall(aa->pa_cookie, rc);
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524 struct obd_capa *capa,
525 obd_enqueue_update_f upcall, void *cookie,
526 struct ptlrpc_request_set *rqset)
528 struct ptlrpc_request *req;
529 struct osc_punch_args *aa;
530 struct ost_body *body;
534 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
538 osc_set_capa_size(req, &RMF_CAPA1, capa);
539 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
541 ptlrpc_request_free(req);
544 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545 ptlrpc_at_set_req_timeout(req);
547 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
550 osc_pack_capa(req, body, capa);
552 ptlrpc_request_set_replen(req);
555 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557 aa = ptlrpc_req_async_args(req);
559 aa->pa_upcall = upcall;
560 aa->pa_cookie = cookie;
561 if (rqset == PTLRPCD_SET)
562 ptlrpcd_add_req(req, PSCOPE_OTHER);
564 ptlrpc_set_add_req(rqset, req);
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570 struct obd_trans_info *oti,
571 struct ptlrpc_request_set *rqset)
573 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
574 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577 oinfo->oi_cb_up, oinfo, rqset);
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *md, obd_size start, obd_size end,
584 struct ptlrpc_request *req;
585 struct ost_body *body;
590 CDEBUG(D_INFO, "oa NULL\n");
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598 osc_set_capa_size(req, &RMF_CAPA1, capa);
599 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 ptlrpc_request_free(req);
605 /* overload the size and blocks fields in the oa with start/end */
606 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609 body->oa.o_size = start;
610 body->oa.o_blocks = end;
611 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612 osc_pack_capa(req, body, capa);
614 ptlrpc_request_set_replen(req);
616 rc = ptlrpc_queue_wait(req);
620 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
622 GOTO(out, rc = -EPROTO);
628 ptlrpc_req_finished(req);
632 /* Find and cancel locally locks matched by @mode in the resource found by
633 * @objid. Found locks are added into @cancel list. Returns the amount of
634 * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636 struct list_head *cancels, ldlm_mode_t mode,
639 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640 struct ldlm_res_id res_id;
641 struct ldlm_resource *res;
645 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
650 LDLM_RESOURCE_ADDREF(res);
651 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652 lock_flags, 0, NULL);
653 LDLM_RESOURCE_DELREF(res);
654 ldlm_resource_putref(res);
658 static int osc_destroy_interpret(const struct lu_env *env,
659 struct ptlrpc_request *req, void *data,
662 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
664 atomic_dec(&cli->cl_destroy_in_flight);
665 cfs_waitq_signal(&cli->cl_destroy_waitq);
669 static int osc_can_send_destroy(struct client_obd *cli)
671 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672 cli->cl_max_rpcs_in_flight) {
673 /* The destroy request can be sent */
676 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677 cli->cl_max_rpcs_in_flight) {
679 * The counter has been modified between the two atomic
682 cfs_waitq_signal(&cli->cl_destroy_waitq);
687 /* Destroy requests can be async always on the client, and we don't even really
688 * care about the return code since the client cannot do anything at all about
690 * When the MDS is unlinking a filename, it saves the file objects into a
691 * recovery llog, and these object records are cancelled when the OST reports
692 * they were destroyed and sync'd to disk (i.e. transaction committed).
693 * If the client dies, or the OST is down when the object should be destroyed,
694 * the records are not cancelled, and when the OST reconnects to the MDS next,
695 * it will retrieve the llog unlink logs and then sends the log cancellation
696 * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698 struct lov_stripe_md *ea, struct obd_trans_info *oti,
699 struct obd_export *md_export, void *capa)
701 struct client_obd *cli = &exp->exp_obd->u.cli;
702 struct ptlrpc_request *req;
703 struct ost_body *body;
704 CFS_LIST_HEAD(cancels);
709 CDEBUG(D_INFO, "oa NULL\n");
713 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714 LDLM_FL_DISCARD_DATA);
716 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
718 ldlm_lock_list_put(&cancels, l_bl_ast, count);
722 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
726 ptlrpc_request_free(req);
730 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731 ptlrpc_at_set_req_timeout(req);
733 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734 oa->o_lcookie = *oti->oti_logcookies;
735 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
739 osc_pack_capa(req, body, (struct obd_capa *)capa);
740 ptlrpc_request_set_replen(req);
742 /* don't throttle destroy RPCs for the MDT */
743 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744 req->rq_interpret_reply = osc_destroy_interpret;
745 if (!osc_can_send_destroy(cli)) {
746 struct l_wait_info lwi = { 0 };
749 * Wait until the number of on-going destroy RPCs drops
750 * under max_rpc_in_flight
752 l_wait_event_exclusive(cli->cl_destroy_waitq,
753 osc_can_send_destroy(cli), &lwi);
757 /* Do not wait for response */
758 ptlrpcd_add_req(req, PSCOPE_OTHER);
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
765 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767 LASSERT(!(oa->o_valid & bits));
770 client_obd_list_lock(&cli->cl_loi_list_lock);
771 oa->o_dirty = cli->cl_dirty;
772 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776 } else if (atomic_read(&obd_dirty_pages) -
777 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778 CERROR("dirty %d - %d > system dirty_max %d\n",
779 atomic_read(&obd_dirty_pages),
780 atomic_read(&obd_dirty_transit_pages),
781 obd_max_dirty_pages);
783 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784 CERROR("dirty %lu - dirty_max %lu too big???\n",
785 cli->cl_dirty, cli->cl_dirty_max);
788 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789 (cli->cl_max_rpcs_in_flight + 1);
790 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792 oa->o_grant = cli->cl_avail_grant;
793 oa->o_dropped = cli->cl_lost_grant;
794 cli->cl_lost_grant = 0;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
796 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
801 static void osc_update_next_shrink(struct client_obd *cli)
803 int time = GRANT_SHRINK_INTERVAL;
804 cli->cl_next_shrink_grant = cfs_time_shift(time);
805 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806 cli->cl_next_shrink_grant);
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811 struct brw_page *pga)
813 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814 atomic_inc(&obd_dirty_pages);
815 cli->cl_dirty += CFS_PAGE_SIZE;
816 cli->cl_avail_grant -= CFS_PAGE_SIZE;
817 pga->flag |= OBD_BRW_FROM_GRANT;
818 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819 CFS_PAGE_SIZE, pga, pga->pg);
820 LASSERT(cli->cl_avail_grant >= 0);
821 osc_update_next_shrink(cli);
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825 * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827 struct brw_page *pga, int sent)
829 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
837 pga->flag &= ~OBD_BRW_FROM_GRANT;
838 atomic_dec(&obd_dirty_pages);
839 cli->cl_dirty -= CFS_PAGE_SIZE;
840 if (pga->flag & OBD_BRW_NOCACHE) {
841 pga->flag &= ~OBD_BRW_NOCACHE;
842 atomic_dec(&obd_dirty_transit_pages);
843 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
846 cli->cl_lost_grant += CFS_PAGE_SIZE;
847 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850 /* For short writes we shouldn't count parts of pages that
851 * span a whole block on the OST side, or our accounting goes
852 * wrong. Should match the code in filter_grant_check. */
853 int offset = pga->off & ~CFS_PAGE_MASK;
854 int count = pga->count + (offset & (blocksize - 1));
855 int end = (offset + pga->count) & (blocksize - 1);
857 count += blocksize - end;
859 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862 cli->cl_avail_grant, cli->cl_dirty);
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
870 return cli->cl_r_in_flight + cli->cl_w_in_flight;
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
876 struct list_head *l, *tmp;
877 struct osc_cache_waiter *ocw;
880 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881 /* if we can't dirty more, we must wait until some is written */
882 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885 "osc max %ld, sys max %d\n", cli->cl_dirty,
886 cli->cl_dirty_max, obd_max_dirty_pages);
890 /* if still dirty cache but no grant wait for pending RPCs that
891 * may yet return us some grant before doing sync writes */
892 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894 cli->cl_w_in_flight);
898 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899 list_del_init(&ocw->ocw_entry);
900 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901 /* no more RPCs in flight to return grant, do sync IO */
902 ocw->ocw_rc = -EDQUOT;
903 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
905 osc_consume_write_grant(cli,
906 &ocw->ocw_oap->oap_brw_page);
909 cfs_waitq_signal(&ocw->ocw_waitq);
915 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
917 client_obd_list_lock(&cli->cl_loi_list_lock);
918 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
919 if (body->oa.o_valid & OBD_MD_FLGRANT)
920 cli->cl_avail_grant += body->oa.o_grant;
921 /* waiters are woken in brw_interpret */
922 client_obd_list_unlock(&cli->cl_loi_list_lock);
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926 void *key, obd_count vallen, void *val,
927 struct ptlrpc_request_set *set);
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930 struct ptlrpc_request *req,
933 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935 struct ost_body *body;
938 client_obd_list_lock(&cli->cl_loi_list_lock);
939 cli->cl_avail_grant += oa->o_grant;
940 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
946 osc_update_grant(cli, body);
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
954 client_obd_list_lock(&cli->cl_loi_list_lock);
955 oa->o_grant = cli->cl_avail_grant / 4;
956 cli->cl_avail_grant -= oa->o_grant;
957 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 oa->o_flags |= OBD_FL_SHRINK_GRANT;
959 osc_update_next_shrink(cli);
962 static int osc_shrink_grant(struct client_obd *cli)
965 struct ost_body *body;
972 osc_announce_cached(cli, &body->oa, 0);
973 osc_shrink_grant_local(cli, &body->oa);
974 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
975 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976 sizeof(*body), body, NULL);
978 client_obd_list_lock(&cli->cl_loi_list_lock);
979 cli->cl_avail_grant += body->oa.o_grant;
980 client_obd_list_unlock(&cli->cl_loi_list_lock);
987 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
988 static int osc_should_shrink_grant(struct client_obd *client)
990 cfs_time_t time = cfs_time_current();
991 cfs_time_t next_shrink = client->cl_next_shrink_grant;
992 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
997 osc_update_next_shrink(client);
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1004 struct client_obd *client;
1006 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007 if (osc_should_shrink_grant(client))
1008 osc_shrink_grant(client);
1013 static int osc_add_shrink_grant(struct client_obd *client)
1017 rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL,
1019 osc_grant_shrink_grant_cb, NULL,
1020 &client->cl_grant_shrink_list);
1022 CERROR("add grant client %s error %d\n",
1023 client->cl_import->imp_obd->obd_name, rc);
1026 CDEBUG(D_CACHE, "add grant client %s \n",
1027 client->cl_import->imp_obd->obd_name);
1028 osc_update_next_shrink(client);
1032 static int osc_del_shrink_grant(struct client_obd *client)
1034 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1038 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1040 client_obd_list_lock(&cli->cl_loi_list_lock);
1041 cli->cl_avail_grant = ocd->ocd_grant;
1042 client_obd_list_unlock(&cli->cl_loi_list_lock);
1044 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1045 list_empty(&cli->cl_grant_shrink_list))
1046 osc_add_shrink_grant(cli);
1048 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1049 cli->cl_avail_grant, cli->cl_lost_grant);
1050 LASSERT(cli->cl_avail_grant >= 0);
1053 /* We assume that the reason this OSC got a short read is because it read
1054 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1055 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1056 * this stripe never got written at or beyond this stripe offset yet. */
1057 static void handle_short_read(int nob_read, obd_count page_count,
1058 struct brw_page **pga)
1063 /* skip bytes read OK */
1064 while (nob_read > 0) {
1065 LASSERT (page_count > 0);
1067 if (pga[i]->count > nob_read) {
1068 /* EOF inside this page */
1069 ptr = cfs_kmap(pga[i]->pg) +
1070 (pga[i]->off & ~CFS_PAGE_MASK);
1071 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1072 cfs_kunmap(pga[i]->pg);
1078 nob_read -= pga[i]->count;
1083 /* zero remaining pages */
1084 while (page_count-- > 0) {
1085 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1086 memset(ptr, 0, pga[i]->count);
1087 cfs_kunmap(pga[i]->pg);
1092 static int check_write_rcs(struct ptlrpc_request *req,
1093 int requested_nob, int niocount,
1094 obd_count page_count, struct brw_page **pga)
1098 /* return error if any niobuf was in error */
1099 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1100 sizeof(*remote_rcs) * niocount, NULL);
1101 if (remote_rcs == NULL) {
1102 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1105 if (lustre_msg_swabbed(req->rq_repmsg))
1106 for (i = 0; i < niocount; i++)
1107 __swab32s(&remote_rcs[i]);
1109 for (i = 0; i < niocount; i++) {
1110 if (remote_rcs[i] < 0)
1111 return(remote_rcs[i]);
1113 if (remote_rcs[i] != 0) {
1114 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1115 i, remote_rcs[i], req);
1120 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1121 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1122 req->rq_bulk->bd_nob_transferred, requested_nob);
1129 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1131 if (p1->flag != p2->flag) {
1132 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1133 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1135 /* warn if we try to combine flags that we don't know to be
1136 * safe to combine */
1137 if ((p1->flag & mask) != (p2->flag & mask))
1138 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1139 "same brw?\n", p1->flag, p2->flag);
1143 return (p1->off + p1->count == p2->off);
1146 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1147 struct brw_page **pga, int opc,
1148 cksum_type_t cksum_type)
1153 LASSERT (pg_count > 0);
1154 cksum = init_checksum(cksum_type);
1155 while (nob > 0 && pg_count > 0) {
1156 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1157 int off = pga[i]->off & ~CFS_PAGE_MASK;
1158 int count = pga[i]->count > nob ? nob : pga[i]->count;
1160 /* corrupt the data before we compute the checksum, to
1161 * simulate an OST->client data error */
1162 if (i == 0 && opc == OST_READ &&
1163 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1164 memcpy(ptr + off, "bad1", min(4, nob));
1165 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1166 cfs_kunmap(pga[i]->pg);
1167 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1170 nob -= pga[i]->count;
1174 /* For sending we only compute the wrong checksum instead
1175 * of corrupting the data so it is still correct on a redo */
1176 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1182 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1183 struct lov_stripe_md *lsm, obd_count page_count,
1184 struct brw_page **pga,
1185 struct ptlrpc_request **reqp,
1186 struct obd_capa *ocapa, int reserve)
1188 struct ptlrpc_request *req;
1189 struct ptlrpc_bulk_desc *desc;
1190 struct ost_body *body;
1191 struct obd_ioobj *ioobj;
1192 struct niobuf_remote *niobuf;
1193 int niocount, i, requested_nob, opc, rc;
1194 struct osc_brw_async_args *aa;
1195 struct req_capsule *pill;
1196 struct brw_page *pg_prev;
1199 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1200 RETURN(-ENOMEM); /* Recoverable */
1201 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1202 RETURN(-EINVAL); /* Fatal */
1204 if ((cmd & OBD_BRW_WRITE) != 0) {
1206 req = ptlrpc_request_alloc_pool(cli->cl_import,
1207 cli->cl_import->imp_rq_pool,
1211 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1216 for (niocount = i = 1; i < page_count; i++) {
1217 if (!can_merge_pages(pga[i - 1], pga[i]))
1221 pill = &req->rq_pill;
1222 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1223 niocount * sizeof(*niobuf));
1224 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1226 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1228 ptlrpc_request_free(req);
1231 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1232 ptlrpc_at_set_req_timeout(req);
1234 if (opc == OST_WRITE)
1235 desc = ptlrpc_prep_bulk_imp(req, page_count,
1236 BULK_GET_SOURCE, OST_BULK_PORTAL);
1238 desc = ptlrpc_prep_bulk_imp(req, page_count,
1239 BULK_PUT_SINK, OST_BULK_PORTAL);
1242 GOTO(out, rc = -ENOMEM);
1243 /* NB request now owns desc and will free it when it gets freed */
1245 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1246 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1247 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1248 LASSERT(body && ioobj && niobuf);
1252 obdo_to_ioobj(oa, ioobj);
1253 ioobj->ioo_bufcnt = niocount;
1254 osc_pack_capa(req, body, ocapa);
1255 LASSERT (page_count > 0);
1257 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1258 struct brw_page *pg = pga[i];
1260 LASSERT(pg->count > 0);
1261 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1262 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1263 pg->off, pg->count);
1265 LASSERTF(i == 0 || pg->off > pg_prev->off,
1266 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1267 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1269 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1270 pg_prev->pg, page_private(pg_prev->pg),
1271 pg_prev->pg->index, pg_prev->off);
1273 LASSERTF(i == 0 || pg->off > pg_prev->off,
1274 "i %d p_c %u\n", i, page_count);
1276 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1277 (pg->flag & OBD_BRW_SRVLOCK));
1279 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1281 requested_nob += pg->count;
1283 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1285 niobuf->len += pg->count;
1287 niobuf->offset = pg->off;
1288 niobuf->len = pg->count;
1289 niobuf->flags = pg->flag;
1294 LASSERTF((void *)(niobuf - niocount) ==
1295 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1296 niocount * sizeof(*niobuf)),
1297 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1298 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1299 (void *)(niobuf - niocount));
1301 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1302 if (osc_should_shrink_grant(cli))
1303 osc_shrink_grant_local(cli, &body->oa);
1305 /* size[REQ_REC_OFF] still sizeof (*body) */
1306 if (opc == OST_WRITE) {
1307 if (unlikely(cli->cl_checksum) &&
1308 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1309 /* store cl_cksum_type in a local variable since
1310 * it can be changed via lprocfs */
1311 cksum_type_t cksum_type = cli->cl_cksum_type;
1313 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1314 oa->o_flags = body->oa.o_flags = 0;
1315 body->oa.o_flags |= cksum_type_pack(cksum_type);
1316 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1317 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1321 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1323 /* save this in 'oa', too, for later checking */
1324 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1325 oa->o_flags |= cksum_type_pack(cksum_type);
1327 /* clear out the checksum flag, in case this is a
1328 * resend but cl_checksum is no longer set. b=11238 */
1329 oa->o_valid &= ~OBD_MD_FLCKSUM;
1331 oa->o_cksum = body->oa.o_cksum;
1332 /* 1 RC per niobuf */
1333 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1334 sizeof(__u32) * niocount);
1336 if (unlikely(cli->cl_checksum) &&
1337 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1338 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1339 body->oa.o_flags = 0;
1340 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1341 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1343 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1344 /* 1 RC for the whole I/O */
1346 ptlrpc_request_set_replen(req);
1348 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1349 aa = ptlrpc_req_async_args(req);
1351 aa->aa_requested_nob = requested_nob;
1352 aa->aa_nio_count = niocount;
1353 aa->aa_page_count = page_count;
1357 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1358 if (ocapa && reserve)
1359 aa->aa_ocapa = capa_get(ocapa);
1365 ptlrpc_req_finished(req);
1369 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1370 __u32 client_cksum, __u32 server_cksum, int nob,
1371 obd_count page_count, struct brw_page **pga,
1372 cksum_type_t client_cksum_type)
1376 cksum_type_t cksum_type;
1378 if (server_cksum == client_cksum) {
1379 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1383 if (oa->o_valid & OBD_MD_FLFLAGS)
1384 cksum_type = cksum_type_unpack(oa->o_flags);
1386 cksum_type = OBD_CKSUM_CRC32;
1388 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1391 if (cksum_type != client_cksum_type)
1392 msg = "the server did not use the checksum type specified in "
1393 "the original request - likely a protocol problem";
1394 else if (new_cksum == server_cksum)
1395 msg = "changed on the client after we checksummed it - "
1396 "likely false positive due to mmap IO (bug 11742)";
1397 else if (new_cksum == client_cksum)
1398 msg = "changed in transit before arrival at OST";
1400 msg = "changed in transit AND doesn't match the original - "
1401 "likely false positive due to mmap IO (bug 11742)";
1403 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1404 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1405 "["LPU64"-"LPU64"]\n",
1406 msg, libcfs_nid2str(peer->nid),
1407 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1408 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1411 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1413 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1414 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1415 "client csum now %x\n", client_cksum, client_cksum_type,
1416 server_cksum, cksum_type, new_cksum);
1420 /* Note rc enters this function as number of bytes transferred */
1421 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1423 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1424 const lnet_process_id_t *peer =
1425 &req->rq_import->imp_connection->c_peer;
1426 struct client_obd *cli = aa->aa_cli;
1427 struct ost_body *body;
1428 __u32 client_cksum = 0;
1431 if (rc < 0 && rc != -EDQUOT)
1434 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1435 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1436 lustre_swab_ost_body);
1438 CDEBUG(D_INFO, "Can't unpack body\n");
1442 /* set/clear over quota flag for a uid/gid */
1443 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1444 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1445 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1446 body->oa.o_gid, body->oa.o_valid,
1452 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1453 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1455 osc_update_grant(cli, body);
1457 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1459 CERROR("Unexpected +ve rc %d\n", rc);
1462 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1464 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1467 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1468 check_write_checksum(&body->oa, peer, client_cksum,
1469 body->oa.o_cksum, aa->aa_requested_nob,
1470 aa->aa_page_count, aa->aa_ppga,
1471 cksum_type_unpack(aa->aa_oa->o_flags)))
1474 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1475 aa->aa_page_count, aa->aa_ppga);
1479 /* The rest of this function executes only for OST_READs */
1481 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1485 if (rc > aa->aa_requested_nob) {
1486 CERROR("Unexpected rc %d (%d requested)\n", rc,
1487 aa->aa_requested_nob);
1491 if (rc != req->rq_bulk->bd_nob_transferred) {
1492 CERROR ("Unexpected rc %d (%d transferred)\n",
1493 rc, req->rq_bulk->bd_nob_transferred);
1497 if (rc < aa->aa_requested_nob)
1498 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1500 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1501 static int cksum_counter;
1502 __u32 server_cksum = body->oa.o_cksum;
1505 cksum_type_t cksum_type;
1507 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1508 cksum_type = cksum_type_unpack(body->oa.o_flags);
1510 cksum_type = OBD_CKSUM_CRC32;
1511 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1512 aa->aa_ppga, OST_READ,
1515 if (peer->nid == req->rq_bulk->bd_sender) {
1519 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1522 if (server_cksum == ~0 && rc > 0) {
1523 CERROR("Protocol error: server %s set the 'checksum' "
1524 "bit, but didn't send a checksum. Not fatal, "
1525 "but please notify on http://bugzilla.lustre.org/\n",
1526 libcfs_nid2str(peer->nid));
1527 } else if (server_cksum != client_cksum) {
1528 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1529 "%s%s%s inum "LPU64"/"LPU64" object "
1530 LPU64"/"LPU64" extent "
1531 "["LPU64"-"LPU64"]\n",
1532 req->rq_import->imp_obd->obd_name,
1533 libcfs_nid2str(peer->nid),
1535 body->oa.o_valid & OBD_MD_FLFID ?
1536 body->oa.o_fid : (__u64)0,
1537 body->oa.o_valid & OBD_MD_FLFID ?
1538 body->oa.o_generation :(__u64)0,
1540 body->oa.o_valid & OBD_MD_FLGROUP ?
1541 body->oa.o_gr : (__u64)0,
1542 aa->aa_ppga[0]->off,
1543 aa->aa_ppga[aa->aa_page_count-1]->off +
1544 aa->aa_ppga[aa->aa_page_count-1]->count -
1546 CERROR("client %x, server %x, cksum_type %x\n",
1547 client_cksum, server_cksum, cksum_type);
1549 aa->aa_oa->o_cksum = client_cksum;
1553 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1556 } else if (unlikely(client_cksum)) {
1557 static int cksum_missed;
1560 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1561 CERROR("Checksum %u requested from %s but not sent\n",
1562 cksum_missed, libcfs_nid2str(peer->nid));
1568 *aa->aa_oa = body->oa;
1573 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1574 struct lov_stripe_md *lsm,
1575 obd_count page_count, struct brw_page **pga,
1576 struct obd_capa *ocapa)
1578 struct ptlrpc_request *req;
1582 struct l_wait_info lwi;
1586 cfs_waitq_init(&waitq);
1589 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1590 page_count, pga, &req, ocapa, 0);
1594 rc = ptlrpc_queue_wait(req);
1596 if (rc == -ETIMEDOUT && req->rq_resend) {
1597 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1598 ptlrpc_req_finished(req);
1602 rc = osc_brw_fini_request(req, rc);
1604 ptlrpc_req_finished(req);
1605 if (osc_recoverable_error(rc)) {
1607 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1608 CERROR("too many resend retries, returning error\n");
1612 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1613 l_wait_event(waitq, 0, &lwi);
1621 int osc_brw_redo_request(struct ptlrpc_request *request,
1622 struct osc_brw_async_args *aa)
1624 struct ptlrpc_request *new_req;
1625 struct ptlrpc_request_set *set = request->rq_set;
1626 struct osc_brw_async_args *new_aa;
1627 struct osc_async_page *oap;
1631 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1632 CERROR("too many resend retries, returning error\n");
1636 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1638 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1639 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1640 aa->aa_cli, aa->aa_oa,
1641 NULL /* lsm unused by osc currently */,
1642 aa->aa_page_count, aa->aa_ppga,
1643 &new_req, aa->aa_ocapa, 0);
1647 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1649 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1650 if (oap->oap_request != NULL) {
1651 LASSERTF(request == oap->oap_request,
1652 "request %p != oap_request %p\n",
1653 request, oap->oap_request);
1654 if (oap->oap_interrupted) {
1655 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1656 ptlrpc_req_finished(new_req);
1661 /* New request takes over pga and oaps from old request.
1662 * Note that copying a list_head doesn't work, need to move it... */
1664 new_req->rq_interpret_reply = request->rq_interpret_reply;
1665 new_req->rq_async_args = request->rq_async_args;
1666 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1668 new_aa = ptlrpc_req_async_args(new_req);
1670 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1671 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1672 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1674 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1675 if (oap->oap_request) {
1676 ptlrpc_req_finished(oap->oap_request);
1677 oap->oap_request = ptlrpc_request_addref(new_req);
1681 new_aa->aa_ocapa = aa->aa_ocapa;
1682 aa->aa_ocapa = NULL;
1684 /* use ptlrpc_set_add_req is safe because interpret functions work
1685 * in check_set context. only one way exist with access to request
1686 * from different thread got -EINTR - this way protected with
1687 * cl_loi_list_lock */
1688 ptlrpc_set_add_req(set, new_req);
1690 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1692 DEBUG_REQ(D_INFO, new_req, "new request");
1697 * ugh, we want disk allocation on the target to happen in offset order. we'll
1698 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1699 * fine for our small page arrays and doesn't require allocation. its an
1700 * insertion sort that swaps elements that are strides apart, shrinking the
1701 * stride down until its '1' and the array is sorted.
1703 static void sort_brw_pages(struct brw_page **array, int num)
1706 struct brw_page *tmp;
1710 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1715 for (i = stride ; i < num ; i++) {
1718 while (j >= stride && array[j - stride]->off > tmp->off) {
1719 array[j] = array[j - stride];
1724 } while (stride > 1);
1727 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1733 LASSERT (pages > 0);
1734 offset = pg[i]->off & ~CFS_PAGE_MASK;
1738 if (pages == 0) /* that's all */
1741 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1742 return count; /* doesn't end on page boundary */
1745 offset = pg[i]->off & ~CFS_PAGE_MASK;
1746 if (offset != 0) /* doesn't start on page boundary */
1753 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1755 struct brw_page **ppga;
1758 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1762 for (i = 0; i < count; i++)
1767 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1769 LASSERT(ppga != NULL);
1770 OBD_FREE(ppga, sizeof(*ppga) * count);
1773 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1774 obd_count page_count, struct brw_page *pga,
1775 struct obd_trans_info *oti)
1777 struct obdo *saved_oa = NULL;
1778 struct brw_page **ppga, **orig;
1779 struct obd_import *imp = class_exp2cliimp(exp);
1780 struct client_obd *cli;
1781 int rc, page_count_orig;
1784 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1785 cli = &imp->imp_obd->u.cli;
1787 if (cmd & OBD_BRW_CHECK) {
1788 /* The caller just wants to know if there's a chance that this
1789 * I/O can succeed */
1791 if (imp->imp_invalid)
1796 /* test_brw with a failed create can trip this, maybe others. */
1797 LASSERT(cli->cl_max_pages_per_rpc);
1801 orig = ppga = osc_build_ppga(pga, page_count);
1804 page_count_orig = page_count;
1806 sort_brw_pages(ppga, page_count);
1807 while (page_count) {
1808 obd_count pages_per_brw;
1810 if (page_count > cli->cl_max_pages_per_rpc)
1811 pages_per_brw = cli->cl_max_pages_per_rpc;
1813 pages_per_brw = page_count;
1815 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1817 if (saved_oa != NULL) {
1818 /* restore previously saved oa */
1819 *oinfo->oi_oa = *saved_oa;
1820 } else if (page_count > pages_per_brw) {
1821 /* save a copy of oa (brw will clobber it) */
1822 OBDO_ALLOC(saved_oa);
1823 if (saved_oa == NULL)
1824 GOTO(out, rc = -ENOMEM);
1825 *saved_oa = *oinfo->oi_oa;
1828 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1829 pages_per_brw, ppga, oinfo->oi_capa);
1834 page_count -= pages_per_brw;
1835 ppga += pages_per_brw;
1839 osc_release_ppga(orig, page_count_orig);
1841 if (saved_oa != NULL)
1842 OBDO_FREE(saved_oa);
1847 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1848 * the dirty accounting. Writeback completes or truncate happens before
1849 * writing starts. Must be called with the loi lock held. */
1850 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1853 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1857 /* This maintains the lists of pending pages to read/write for a given object
1858 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1859 * to quickly find objects that are ready to send an RPC. */
1860 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1866 if (lop->lop_num_pending == 0)
1869 /* if we have an invalid import we want to drain the queued pages
1870 * by forcing them through rpcs that immediately fail and complete
1871 * the pages. recovery relies on this to empty the queued pages
1872 * before canceling the locks and evicting down the llite pages */
1873 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1876 /* stream rpcs in queue order as long as as there is an urgent page
1877 * queued. this is our cheap solution for good batching in the case
1878 * where writepage marks some random page in the middle of the file
1879 * as urgent because of, say, memory pressure */
1880 if (!list_empty(&lop->lop_urgent)) {
1881 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1884 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1885 optimal = cli->cl_max_pages_per_rpc;
1886 if (cmd & OBD_BRW_WRITE) {
1887 /* trigger a write rpc stream as long as there are dirtiers
1888 * waiting for space. as they're waiting, they're not going to
1889 * create more pages to coallesce with what's waiting.. */
1890 if (!list_empty(&cli->cl_cache_waiters)) {
1891 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1894 /* +16 to avoid triggering rpcs that would want to include pages
1895 * that are being queued but which can't be made ready until
1896 * the queuer finishes with the page. this is a wart for
1897 * llite::commit_write() */
1900 if (lop->lop_num_pending >= optimal)
1906 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1908 struct osc_async_page *oap;
1911 if (list_empty(&lop->lop_urgent))
1914 oap = list_entry(lop->lop_urgent.next,
1915 struct osc_async_page, oap_urgent_item);
1917 if (oap->oap_async_flags & ASYNC_HP) {
1918 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1925 static void on_list(struct list_head *item, struct list_head *list,
1928 if (list_empty(item) && should_be_on)
1929 list_add_tail(item, list);
1930 else if (!list_empty(item) && !should_be_on)
1931 list_del_init(item);
1934 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1935 * can find pages to build into rpcs quickly */
1936 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1938 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1939 lop_makes_hprpc(&loi->loi_read_lop)) {
1941 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1942 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1944 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1945 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1946 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1947 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1950 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1951 loi->loi_write_lop.lop_num_pending);
1953 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1954 loi->loi_read_lop.lop_num_pending);
1957 static void lop_update_pending(struct client_obd *cli,
1958 struct loi_oap_pages *lop, int cmd, int delta)
1960 lop->lop_num_pending += delta;
1961 if (cmd & OBD_BRW_WRITE)
1962 cli->cl_pending_w_pages += delta;
1964 cli->cl_pending_r_pages += delta;
1968 * this is called when a sync waiter receives an interruption. Its job is to
1969 * get the caller woken as soon as possible. If its page hasn't been put in an
1970 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1971 * desiring interruption which will forcefully complete the rpc once the rpc
1974 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1976 struct loi_oap_pages *lop;
1977 struct lov_oinfo *loi;
1981 LASSERT(!oap->oap_interrupted);
1982 oap->oap_interrupted = 1;
1984 /* ok, it's been put in an rpc. only one oap gets a request reference */
1985 if (oap->oap_request != NULL) {
1986 ptlrpc_mark_interrupted(oap->oap_request);
1987 ptlrpcd_wake(oap->oap_request);
1988 ptlrpc_req_finished(oap->oap_request);
1989 oap->oap_request = NULL;
1993 * page completion may be called only if ->cpo_prep() method was
1994 * executed by osc_io_submit(), that also adds page the to pending list
1996 if (!list_empty(&oap->oap_pending_item)) {
1997 list_del_init(&oap->oap_pending_item);
1998 list_del_init(&oap->oap_urgent_item);
2001 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2002 &loi->loi_write_lop : &loi->loi_read_lop;
2003 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2004 loi_list_maint(oap->oap_cli, oap->oap_loi);
2005 rc = oap->oap_caller_ops->ap_completion(env,
2006 oap->oap_caller_data,
2007 oap->oap_cmd, NULL, -EINTR);
2013 /* this is trying to propogate async writeback errors back up to the
2014 * application. As an async write fails we record the error code for later if
2015 * the app does an fsync. As long as errors persist we force future rpcs to be
2016 * sync so that the app can get a sync error and break the cycle of queueing
2017 * pages for which writeback will fail. */
2018 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2025 ar->ar_force_sync = 1;
2026 ar->ar_min_xid = ptlrpc_sample_next_xid();
2031 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2032 ar->ar_force_sync = 0;
2035 void osc_oap_to_pending(struct osc_async_page *oap)
2037 struct loi_oap_pages *lop;
2039 if (oap->oap_cmd & OBD_BRW_WRITE)
2040 lop = &oap->oap_loi->loi_write_lop;
2042 lop = &oap->oap_loi->loi_read_lop;
2044 if (oap->oap_async_flags & ASYNC_HP)
2045 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2046 else if (oap->oap_async_flags & ASYNC_URGENT)
2047 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2048 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2049 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2052 /* this must be called holding the loi list lock to give coverage to exit_cache,
2053 * async_flag maintenance, and oap_request */
2054 static void osc_ap_completion(const struct lu_env *env,
2055 struct client_obd *cli, struct obdo *oa,
2056 struct osc_async_page *oap, int sent, int rc)
2061 if (oap->oap_request != NULL) {
2062 xid = ptlrpc_req_xid(oap->oap_request);
2063 ptlrpc_req_finished(oap->oap_request);
2064 oap->oap_request = NULL;
2067 oap->oap_async_flags = 0;
2068 oap->oap_interrupted = 0;
2070 if (oap->oap_cmd & OBD_BRW_WRITE) {
2071 osc_process_ar(&cli->cl_ar, xid, rc);
2072 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2075 if (rc == 0 && oa != NULL) {
2076 if (oa->o_valid & OBD_MD_FLBLOCKS)
2077 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2078 if (oa->o_valid & OBD_MD_FLMTIME)
2079 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2080 if (oa->o_valid & OBD_MD_FLATIME)
2081 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2082 if (oa->o_valid & OBD_MD_FLCTIME)
2083 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2086 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2087 oap->oap_cmd, oa, rc);
2089 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2090 * I/O on the page could start, but OSC calls it under lock
2091 * and thus we can add oap back to pending safely */
2093 /* upper layer wants to leave the page on pending queue */
2094 osc_oap_to_pending(oap);
2096 osc_exit_cache(cli, oap, sent);
2100 static int brw_interpret(const struct lu_env *env,
2101 struct ptlrpc_request *req, void *data, int rc)
2103 struct osc_brw_async_args *aa = data;
2104 struct client_obd *cli;
2108 rc = osc_brw_fini_request(req, rc);
2109 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2110 if (osc_recoverable_error(rc)) {
2111 rc = osc_brw_redo_request(req, aa);
2117 capa_put(aa->aa_ocapa);
2118 aa->aa_ocapa = NULL;
2123 client_obd_list_lock(&cli->cl_loi_list_lock);
2125 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2126 * is called so we know whether to go to sync BRWs or wait for more
2127 * RPCs to complete */
2128 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2129 cli->cl_w_in_flight--;
2131 cli->cl_r_in_flight--;
2133 async = list_empty(&aa->aa_oaps);
2134 if (!async) { /* from osc_send_oap_rpc() */
2135 struct osc_async_page *oap, *tmp;
2136 /* the caller may re-use the oap after the completion call so
2137 * we need to clean it up a little */
2138 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2139 list_del_init(&oap->oap_rpc_item);
2140 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2142 OBDO_FREE(aa->aa_oa);
2143 } else { /* from async_internal() */
2145 for (i = 0; i < aa->aa_page_count; i++)
2146 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2148 osc_wake_cache_waiters(cli);
2149 osc_check_rpcs(env, cli);
2150 client_obd_list_unlock(&cli->cl_loi_list_lock);
2152 cl_req_completion(env, aa->aa_clerq, rc);
2153 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2157 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2158 struct client_obd *cli,
2159 struct list_head *rpc_list,
2160 int page_count, int cmd)
2162 struct ptlrpc_request *req;
2163 struct brw_page **pga = NULL;
2164 struct osc_brw_async_args *aa;
2165 struct obdo *oa = NULL;
2166 const struct obd_async_page_ops *ops = NULL;
2167 void *caller_data = NULL;
2168 struct osc_async_page *oap;
2169 struct osc_async_page *tmp;
2170 struct ost_body *body;
2171 struct cl_req *clerq = NULL;
2172 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2173 struct ldlm_lock *lock = NULL;
2174 struct cl_req_attr crattr;
2178 LASSERT(!list_empty(rpc_list));
2180 memset(&crattr, 0, sizeof crattr);
2181 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2183 GOTO(out, req = ERR_PTR(-ENOMEM));
2187 GOTO(out, req = ERR_PTR(-ENOMEM));
2190 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2191 struct cl_page *page = osc_oap2cl_page(oap);
2193 ops = oap->oap_caller_ops;
2194 caller_data = oap->oap_caller_data;
2196 clerq = cl_req_alloc(env, page, crt,
2197 1 /* only 1-object rpcs for
2200 GOTO(out, req = (void *)clerq);
2201 lock = oap->oap_ldlm_lock;
2203 pga[i] = &oap->oap_brw_page;
2204 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2205 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2206 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2208 cl_req_page_add(env, clerq, page);
2211 /* always get the data for the obdo for the rpc */
2212 LASSERT(ops != NULL);
2214 crattr.cra_capa = NULL;
2215 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2217 oa->o_handle = lock->l_remote_handle;
2218 oa->o_valid |= OBD_MD_FLHANDLE;
2221 rc = cl_req_prep(env, clerq);
2223 CERROR("cl_req_prep failed: %d\n", rc);
2224 GOTO(out, req = ERR_PTR(rc));
2227 sort_brw_pages(pga, page_count);
2228 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2229 pga, &req, crattr.cra_capa, 1);
2231 CERROR("prep_req failed: %d\n", rc);
2232 GOTO(out, req = ERR_PTR(rc));
2235 /* Need to update the timestamps after the request is built in case
2236 * we race with setattr (locally or in queue at OST). If OST gets
2237 * later setattr before earlier BRW (as determined by the request xid),
2238 * the OST will not use BRW timestamps. Sadly, there is no obvious
2239 * way to do this in a single call. bug 10150 */
2240 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2241 cl_req_attr_set(env, clerq, &crattr,
2242 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2244 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2245 aa = ptlrpc_req_async_args(req);
2246 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2247 list_splice(rpc_list, &aa->aa_oaps);
2248 CFS_INIT_LIST_HEAD(rpc_list);
2249 aa->aa_clerq = clerq;
2251 capa_put(crattr.cra_capa);
2256 OBD_FREE(pga, sizeof(*pga) * page_count);
2257 /* this should happen rarely and is pretty bad, it makes the
2258 * pending list not follow the dirty order */
2259 client_obd_list_lock(&cli->cl_loi_list_lock);
2260 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2261 list_del_init(&oap->oap_rpc_item);
2263 /* queued sync pages can be torn down while the pages
2264 * were between the pending list and the rpc */
2265 if (oap->oap_interrupted) {
2266 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2267 osc_ap_completion(env, cli, NULL, oap, 0,
2271 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2273 if (clerq && !IS_ERR(clerq))
2274 cl_req_completion(env, clerq, PTR_ERR(req));
2280 * prepare pages for ASYNC io and put pages in send queue.
2284 * \param cmd - OBD_BRW_* macroses
2285 * \param lop - pending pages
2287 * \return zero if pages successfully add to send queue.
2288 * \return not zere if error occurring.
2291 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2292 struct lov_oinfo *loi,
2293 int cmd, struct loi_oap_pages *lop)
2295 struct ptlrpc_request *req;
2296 obd_count page_count = 0;
2297 struct osc_async_page *oap = NULL, *tmp;
2298 struct osc_brw_async_args *aa;
2299 const struct obd_async_page_ops *ops;
2300 CFS_LIST_HEAD(rpc_list);
2301 unsigned int ending_offset;
2302 unsigned starting_offset = 0;
2304 struct cl_object *clob = NULL;
2307 /* If there are HP OAPs we need to handle at least 1 of them,
2308 * move it the beginning of the pending list for that. */
2309 if (!list_empty(&lop->lop_urgent)) {
2310 oap = list_entry(lop->lop_urgent.next,
2311 struct osc_async_page, oap_urgent_item);
2312 if (oap->oap_async_flags & ASYNC_HP)
2313 list_move(&oap->oap_pending_item, &lop->lop_pending);
2316 /* first we find the pages we're allowed to work with */
2317 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2319 ops = oap->oap_caller_ops;
2321 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2322 "magic 0x%x\n", oap, oap->oap_magic);
2325 /* pin object in memory, so that completion call-backs
2326 * can be safely called under client_obd_list lock. */
2327 clob = osc_oap2cl_page(oap)->cp_obj;
2328 cl_object_get(clob);
2331 if (page_count != 0 &&
2332 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2333 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2334 " oap %p, page %p, srvlock %u\n",
2335 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2338 /* in llite being 'ready' equates to the page being locked
2339 * until completion unlocks it. commit_write submits a page
2340 * as not ready because its unlock will happen unconditionally
2341 * as the call returns. if we race with commit_write giving
2342 * us that page we dont' want to create a hole in the page
2343 * stream, so we stop and leave the rpc to be fired by
2344 * another dirtier or kupdated interval (the not ready page
2345 * will still be on the dirty list). we could call in
2346 * at the end of ll_file_write to process the queue again. */
2347 if (!(oap->oap_async_flags & ASYNC_READY)) {
2348 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2351 CDEBUG(D_INODE, "oap %p page %p returned %d "
2352 "instead of ready\n", oap,
2356 /* llite is telling us that the page is still
2357 * in commit_write and that we should try
2358 * and put it in an rpc again later. we
2359 * break out of the loop so we don't create
2360 * a hole in the sequence of pages in the rpc
2365 /* the io isn't needed.. tell the checks
2366 * below to complete the rpc with EINTR */
2367 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2368 oap->oap_count = -EINTR;
2371 oap->oap_async_flags |= ASYNC_READY;
2374 LASSERTF(0, "oap %p page %p returned %d "
2375 "from make_ready\n", oap,
2383 * Page submitted for IO has to be locked. Either by
2384 * ->ap_make_ready() or by higher layers.
2386 #if defined(__KERNEL__) && defined(__linux__)
2388 struct cl_page *page;
2390 page = osc_oap2cl_page(oap);
2392 if (page->cp_type == CPT_CACHEABLE &&
2393 !(PageLocked(oap->oap_page) &&
2394 (CheckWriteback(oap->oap_page, cmd)))) {
2395 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2397 (long)oap->oap_page->flags,
2398 oap->oap_async_flags);
2403 /* If there is a gap at the start of this page, it can't merge
2404 * with any previous page, so we'll hand the network a
2405 * "fragmented" page array that it can't transfer in 1 RDMA */
2406 if (page_count != 0 && oap->oap_page_off != 0)
2409 /* take the page out of our book-keeping */
2410 list_del_init(&oap->oap_pending_item);
2411 lop_update_pending(cli, lop, cmd, -1);
2412 list_del_init(&oap->oap_urgent_item);
2414 if (page_count == 0)
2415 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2416 (PTLRPC_MAX_BRW_SIZE - 1);
2418 /* ask the caller for the size of the io as the rpc leaves. */
2419 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2421 ops->ap_refresh_count(env, oap->oap_caller_data,
2423 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2425 if (oap->oap_count <= 0) {
2426 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2428 osc_ap_completion(env, cli, NULL,
2429 oap, 0, oap->oap_count);
2433 /* now put the page back in our accounting */
2434 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2435 if (page_count == 0)
2436 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2437 if (++page_count >= cli->cl_max_pages_per_rpc)
2440 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2441 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2442 * have the same alignment as the initial writes that allocated
2443 * extents on the server. */
2444 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2445 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2446 if (ending_offset == 0)
2449 /* If there is a gap at the end of this page, it can't merge
2450 * with any subsequent pages, so we'll hand the network a
2451 * "fragmented" page array that it can't transfer in 1 RDMA */
2452 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2456 osc_wake_cache_waiters(cli);
2458 loi_list_maint(cli, loi);
2460 client_obd_list_unlock(&cli->cl_loi_list_lock);
2463 cl_object_put(env, clob);
2465 if (page_count == 0) {
2466 client_obd_list_lock(&cli->cl_loi_list_lock);
2470 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2472 LASSERT(list_empty(&rpc_list));
2473 loi_list_maint(cli, loi);
2474 RETURN(PTR_ERR(req));
2477 aa = ptlrpc_req_async_args(req);
2479 if (cmd == OBD_BRW_READ) {
2480 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2481 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2482 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2483 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2485 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2486 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2487 cli->cl_w_in_flight);
2488 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2489 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2491 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2493 client_obd_list_lock(&cli->cl_loi_list_lock);
2495 if (cmd == OBD_BRW_READ)
2496 cli->cl_r_in_flight++;
2498 cli->cl_w_in_flight++;
2500 /* queued sync pages can be torn down while the pages
2501 * were between the pending list and the rpc */
2503 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2504 /* only one oap gets a request reference */
2507 if (oap->oap_interrupted && !req->rq_intr) {
2508 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2510 ptlrpc_mark_interrupted(req);
2514 tmp->oap_request = ptlrpc_request_addref(req);
2516 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2517 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2519 req->rq_interpret_reply = brw_interpret;
2520 ptlrpcd_add_req(req, PSCOPE_BRW);
2524 #define LOI_DEBUG(LOI, STR, args...) \
2525 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2526 !list_empty(&(LOI)->loi_ready_item) || \
2527 !list_empty(&(LOI)->loi_hp_ready_item), \
2528 (LOI)->loi_write_lop.lop_num_pending, \
2529 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2530 (LOI)->loi_read_lop.lop_num_pending, \
2531 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2534 /* This is called by osc_check_rpcs() to find which objects have pages that
2535 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2536 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2540 /* First return objects that have blocked locks so that they
2541 * will be flushed quickly and other clients can get the lock,
2542 * then objects which have pages ready to be stuffed into RPCs */
2543 if (!list_empty(&cli->cl_loi_hp_ready_list))
2544 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2545 struct lov_oinfo, loi_hp_ready_item));
2546 if (!list_empty(&cli->cl_loi_ready_list))
2547 RETURN(list_entry(cli->cl_loi_ready_list.next,
2548 struct lov_oinfo, loi_ready_item));
2550 /* then if we have cache waiters, return all objects with queued
2551 * writes. This is especially important when many small files
2552 * have filled up the cache and not been fired into rpcs because
2553 * they don't pass the nr_pending/object threshhold */
2554 if (!list_empty(&cli->cl_cache_waiters) &&
2555 !list_empty(&cli->cl_loi_write_list))
2556 RETURN(list_entry(cli->cl_loi_write_list.next,
2557 struct lov_oinfo, loi_write_item));
2559 /* then return all queued objects when we have an invalid import
2560 * so that they get flushed */
2561 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2562 if (!list_empty(&cli->cl_loi_write_list))
2563 RETURN(list_entry(cli->cl_loi_write_list.next,
2564 struct lov_oinfo, loi_write_item));
2565 if (!list_empty(&cli->cl_loi_read_list))
2566 RETURN(list_entry(cli->cl_loi_read_list.next,
2567 struct lov_oinfo, loi_read_item));
2572 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2574 struct osc_async_page *oap;
2577 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2578 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2579 struct osc_async_page, oap_urgent_item);
2580 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2583 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2584 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2585 struct osc_async_page, oap_urgent_item);
2586 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2589 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2592 /* called with the loi list lock held */
2593 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2595 struct lov_oinfo *loi;
2596 int rc = 0, race_counter = 0;
2599 while ((loi = osc_next_loi(cli)) != NULL) {
2600 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2602 if (osc_max_rpc_in_flight(cli, loi))
2605 /* attempt some read/write balancing by alternating between
2606 * reads and writes in an object. The makes_rpc checks here
2607 * would be redundant if we were getting read/write work items
2608 * instead of objects. we don't want send_oap_rpc to drain a
2609 * partial read pending queue when we're given this object to
2610 * do io on writes while there are cache waiters */
2611 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2612 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2613 &loi->loi_write_lop);
2621 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2622 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2623 &loi->loi_read_lop);
2632 /* attempt some inter-object balancing by issueing rpcs
2633 * for each object in turn */
2634 if (!list_empty(&loi->loi_hp_ready_item))
2635 list_del_init(&loi->loi_hp_ready_item);
2636 if (!list_empty(&loi->loi_ready_item))
2637 list_del_init(&loi->loi_ready_item);
2638 if (!list_empty(&loi->loi_write_item))
2639 list_del_init(&loi->loi_write_item);
2640 if (!list_empty(&loi->loi_read_item))
2641 list_del_init(&loi->loi_read_item);
2643 loi_list_maint(cli, loi);
2645 /* send_oap_rpc fails with 0 when make_ready tells it to
2646 * back off. llite's make_ready does this when it tries
2647 * to lock a page queued for write that is already locked.
2648 * we want to try sending rpcs from many objects, but we
2649 * don't want to spin failing with 0. */
2650 if (race_counter == 10)
2656 /* we're trying to queue a page in the osc so we're subject to the
2657 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2658 * If the osc's queued pages are already at that limit, then we want to sleep
2659 * until there is space in the osc's queue for us. We also may be waiting for
2660 * write credits from the OST if there are RPCs in flight that may return some
2661 * before we fall back to sync writes.
2663 * We need this know our allocation was granted in the presence of signals */
2664 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2668 client_obd_list_lock(&cli->cl_loi_list_lock);
2669 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2670 client_obd_list_unlock(&cli->cl_loi_list_lock);
2675 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2678 int osc_enter_cache_try(const struct lu_env *env,
2679 struct client_obd *cli, struct lov_oinfo *loi,
2680 struct osc_async_page *oap, int transient)
2684 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2686 osc_consume_write_grant(cli, &oap->oap_brw_page);
2688 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2689 atomic_inc(&obd_dirty_transit_pages);
2690 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2696 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2697 * grant or cache space. */
2698 static int osc_enter_cache(const struct lu_env *env,
2699 struct client_obd *cli, struct lov_oinfo *loi,
2700 struct osc_async_page *oap)
2702 struct osc_cache_waiter ocw;
2703 struct l_wait_info lwi = { 0 };
2707 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2708 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2709 cli->cl_dirty_max, obd_max_dirty_pages,
2710 cli->cl_lost_grant, cli->cl_avail_grant);
2712 /* force the caller to try sync io. this can jump the list
2713 * of queued writes and create a discontiguous rpc stream */
2714 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2715 loi->loi_ar.ar_force_sync)
2718 /* Hopefully normal case - cache space and write credits available */
2719 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2720 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2721 osc_enter_cache_try(env, cli, loi, oap, 0))
2724 /* Make sure that there are write rpcs in flight to wait for. This
2725 * is a little silly as this object may not have any pending but
2726 * other objects sure might. */
2727 if (cli->cl_w_in_flight) {
2728 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2729 cfs_waitq_init(&ocw.ocw_waitq);
2733 loi_list_maint(cli, loi);
2734 osc_check_rpcs(env, cli);
2735 client_obd_list_unlock(&cli->cl_loi_list_lock);
2737 CDEBUG(D_CACHE, "sleeping for cache space\n");
2738 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2740 client_obd_list_lock(&cli->cl_loi_list_lock);
2741 if (!list_empty(&ocw.ocw_entry)) {
2742 list_del(&ocw.ocw_entry);
2752 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2753 struct lov_oinfo *loi, cfs_page_t *page,
2754 obd_off offset, const struct obd_async_page_ops *ops,
2755 void *data, void **res, int nocache,
2756 struct lustre_handle *lockh)
2758 struct osc_async_page *oap;
2763 return size_round(sizeof(*oap));
2766 oap->oap_magic = OAP_MAGIC;
2767 oap->oap_cli = &exp->exp_obd->u.cli;
2770 oap->oap_caller_ops = ops;
2771 oap->oap_caller_data = data;
2773 oap->oap_page = page;
2774 oap->oap_obj_off = offset;
2775 if (!client_is_remote(exp) &&
2776 cfs_capable(CFS_CAP_SYS_RESOURCE))
2777 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2779 LASSERT(!(offset & ~CFS_PAGE_MASK));
2781 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2782 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2783 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2784 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2786 spin_lock_init(&oap->oap_lock);
2787 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2791 struct osc_async_page *oap_from_cookie(void *cookie)
2793 struct osc_async_page *oap = cookie;
2794 if (oap->oap_magic != OAP_MAGIC)
2795 return ERR_PTR(-EINVAL);
2799 int osc_queue_async_io(const struct lu_env *env,
2800 struct obd_export *exp, struct lov_stripe_md *lsm,
2801 struct lov_oinfo *loi, void *cookie,
2802 int cmd, obd_off off, int count,
2803 obd_flag brw_flags, enum async_flags async_flags)
2805 struct client_obd *cli = &exp->exp_obd->u.cli;
2806 struct osc_async_page *oap;
2810 oap = oap_from_cookie(cookie);
2812 RETURN(PTR_ERR(oap));
2814 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2817 if (!list_empty(&oap->oap_pending_item) ||
2818 !list_empty(&oap->oap_urgent_item) ||
2819 !list_empty(&oap->oap_rpc_item))
2822 /* check if the file's owner/group is over quota */
2823 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2824 struct cl_object *obj;
2825 struct cl_attr attr; /* XXX put attr into thread info */
2827 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2829 cl_object_attr_lock(obj);
2830 rc = cl_object_attr_get(env, obj, &attr);
2831 cl_object_attr_unlock(obj);
2833 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2834 attr.cat_gid) == NO_QUOTA)
2841 loi = lsm->lsm_oinfo[0];
2843 client_obd_list_lock(&cli->cl_loi_list_lock);
2845 LASSERT(off + count <= CFS_PAGE_SIZE);
2847 oap->oap_page_off = off;
2848 oap->oap_count = count;
2849 oap->oap_brw_flags = brw_flags;
2850 oap->oap_async_flags = async_flags;
2852 if (cmd & OBD_BRW_WRITE) {
2853 rc = osc_enter_cache(env, cli, loi, oap);
2855 client_obd_list_unlock(&cli->cl_loi_list_lock);
2860 osc_oap_to_pending(oap);
2861 loi_list_maint(cli, loi);
2863 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2866 osc_check_rpcs(env, cli);
2867 client_obd_list_unlock(&cli->cl_loi_list_lock);
2872 /* aka (~was & now & flag), but this is more clear :) */
2873 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2875 int osc_set_async_flags_base(struct client_obd *cli,
2876 struct lov_oinfo *loi, struct osc_async_page *oap,
2877 obd_flag async_flags)
2879 struct loi_oap_pages *lop;
2882 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2885 if (oap->oap_cmd & OBD_BRW_WRITE) {
2886 lop = &loi->loi_write_lop;
2888 lop = &loi->loi_read_lop;
2891 if (list_empty(&oap->oap_pending_item))
2894 if ((oap->oap_async_flags & async_flags) == async_flags)
2897 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2898 oap->oap_async_flags |= ASYNC_READY;
2900 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2901 list_empty(&oap->oap_rpc_item)) {
2902 if (oap->oap_async_flags & ASYNC_HP)
2903 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2905 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2906 oap->oap_async_flags |= ASYNC_URGENT;
2907 loi_list_maint(cli, loi);
2910 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2911 oap->oap_async_flags);
2915 int osc_teardown_async_page(struct obd_export *exp,
2916 struct lov_stripe_md *lsm,
2917 struct lov_oinfo *loi, void *cookie)
2919 struct client_obd *cli = &exp->exp_obd->u.cli;
2920 struct loi_oap_pages *lop;
2921 struct osc_async_page *oap;
2925 oap = oap_from_cookie(cookie);
2927 RETURN(PTR_ERR(oap));
2930 loi = lsm->lsm_oinfo[0];
2932 if (oap->oap_cmd & OBD_BRW_WRITE) {
2933 lop = &loi->loi_write_lop;
2935 lop = &loi->loi_read_lop;
2938 client_obd_list_lock(&cli->cl_loi_list_lock);
2940 if (!list_empty(&oap->oap_rpc_item))
2941 GOTO(out, rc = -EBUSY);
2943 osc_exit_cache(cli, oap, 0);
2944 osc_wake_cache_waiters(cli);
2946 if (!list_empty(&oap->oap_urgent_item)) {
2947 list_del_init(&oap->oap_urgent_item);
2948 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2950 if (!list_empty(&oap->oap_pending_item)) {
2951 list_del_init(&oap->oap_pending_item);
2952 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2954 loi_list_maint(cli, loi);
2955 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2957 client_obd_list_unlock(&cli->cl_loi_list_lock);
2961 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2962 struct ldlm_enqueue_info *einfo,
2965 void *data = einfo->ei_cbdata;
2967 LASSERT(lock != NULL);
2968 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2969 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2970 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2971 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2973 lock_res_and_lock(lock);
2974 spin_lock(&osc_ast_guard);
2975 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2976 lock->l_ast_data = data;
2977 spin_unlock(&osc_ast_guard);
2978 unlock_res_and_lock(lock);
2981 static void osc_set_data_with_check(struct lustre_handle *lockh,
2982 struct ldlm_enqueue_info *einfo,
2985 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2988 osc_set_lock_data_with_check(lock, einfo, flags);
2989 LDLM_LOCK_PUT(lock);
2991 CERROR("lockh %p, data %p - client evicted?\n",
2992 lockh, einfo->ei_cbdata);
2995 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2996 ldlm_iterator_t replace, void *data)
2998 struct ldlm_res_id res_id;
2999 struct obd_device *obd = class_exp2obd(exp);
3001 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3002 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3006 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3007 obd_enqueue_update_f upcall, void *cookie,
3010 int intent = *flags & LDLM_FL_HAS_INTENT;
3014 /* The request was created before ldlm_cli_enqueue call. */
3015 if (rc == ELDLM_LOCK_ABORTED) {
3016 struct ldlm_reply *rep;
3017 rep = req_capsule_server_get(&req->rq_pill,
3020 LASSERT(rep != NULL);
3021 if (rep->lock_policy_res1)
3022 rc = rep->lock_policy_res1;
3026 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3027 *flags |= LDLM_FL_LVB_READY;
3028 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3029 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3032 /* Call the update callback. */
3033 rc = (*upcall)(cookie, rc);
3037 static int osc_enqueue_interpret(const struct lu_env *env,
3038 struct ptlrpc_request *req,
3039 struct osc_enqueue_args *aa, int rc)
3041 struct ldlm_lock *lock;
3042 struct lustre_handle handle;
3045 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3046 * might be freed anytime after lock upcall has been called. */
3047 lustre_handle_copy(&handle, aa->oa_lockh);
3048 mode = aa->oa_ei->ei_mode;
3050 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3052 lock = ldlm_handle2lock(&handle);
3054 /* Take an additional reference so that a blocking AST that
3055 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3056 * to arrive after an upcall has been executed by
3057 * osc_enqueue_fini(). */
3058 ldlm_lock_addref(&handle, mode);
3060 /* Complete obtaining the lock procedure. */
3061 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3062 mode, aa->oa_flags, aa->oa_lvb,
3063 sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3065 /* Complete osc stuff. */
3066 rc = osc_enqueue_fini(req, aa->oa_lvb,
3067 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3069 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3071 /* Release the lock for async request. */
3072 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3074 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3075 * not already released by
3076 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3078 ldlm_lock_decref(&handle, mode);
3080 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3081 aa->oa_lockh, req, aa);
3082 ldlm_lock_decref(&handle, mode);
3083 LDLM_LOCK_PUT(lock);
3087 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3088 struct lov_oinfo *loi, int flags,
3089 struct ost_lvb *lvb, __u32 mode, int rc)
3091 if (rc == ELDLM_OK) {
3092 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3095 LASSERT(lock != NULL);
3096 loi->loi_lvb = *lvb;
3097 tmp = loi->loi_lvb.lvb_size;
3098 /* Extend KMS up to the end of this lock and no further
3099 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3100 if (tmp > lock->l_policy_data.l_extent.end)
3101 tmp = lock->l_policy_data.l_extent.end + 1;
3102 if (tmp >= loi->loi_kms) {
3103 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3104 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3105 loi_kms_set(loi, tmp);
3107 LDLM_DEBUG(lock, "lock acquired, setting rss="
3108 LPU64"; leaving kms="LPU64", end="LPU64,
3109 loi->loi_lvb.lvb_size, loi->loi_kms,
3110 lock->l_policy_data.l_extent.end);
3112 ldlm_lock_allow_match(lock);
3113 LDLM_LOCK_PUT(lock);
3114 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3115 loi->loi_lvb = *lvb;
3116 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3117 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3121 EXPORT_SYMBOL(osc_update_enqueue);
3123 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3125 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3126 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3127 * other synchronous requests, however keeping some locks and trying to obtain
3128 * others may take a considerable amount of time in a case of ost failure; and
3129 * when other sync requests do not get released lock from a client, the client
3130 * is excluded from the cluster -- such scenarious make the life difficult, so
3131 * release locks just after they are obtained. */
3132 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3133 int *flags, ldlm_policy_data_t *policy,
3134 struct ost_lvb *lvb, int kms_valid,
3135 obd_enqueue_update_f upcall, void *cookie,
3136 struct ldlm_enqueue_info *einfo,
3137 struct lustre_handle *lockh,
3138 struct ptlrpc_request_set *rqset, int async)
3140 struct obd_device *obd = exp->exp_obd;
3141 struct ptlrpc_request *req = NULL;
3142 int intent = *flags & LDLM_FL_HAS_INTENT;
3147 /* Filesystem lock extents are extended to page boundaries so that
3148 * dealing with the page cache is a little smoother. */
3149 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3150 policy->l_extent.end |= ~CFS_PAGE_MASK;
3153 * kms is not valid when either object is completely fresh (so that no
3154 * locks are cached), or object was evicted. In the latter case cached
3155 * lock cannot be used, because it would prime inode state with
3156 * potentially stale LVB.
3161 /* Next, search for already existing extent locks that will cover us */
3162 /* If we're trying to read, we also search for an existing PW lock. The
3163 * VFS and page cache already protect us locally, so lots of readers/
3164 * writers can share a single PW lock.
3166 * There are problems with conversion deadlocks, so instead of
3167 * converting a read lock to a write lock, we'll just enqueue a new
3170 * At some point we should cancel the read lock instead of making them
3171 * send us a blocking callback, but there are problems with canceling
3172 * locks out from other users right now, too. */
3173 mode = einfo->ei_mode;
3174 if (einfo->ei_mode == LCK_PR)
3176 mode = ldlm_lock_match(obd->obd_namespace,
3177 *flags | LDLM_FL_LVB_READY, res_id,
3178 einfo->ei_type, policy, mode, lockh, 0);
3180 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3182 if (matched->l_ast_data == NULL ||
3183 matched->l_ast_data == einfo->ei_cbdata) {
3184 /* addref the lock only if not async requests and PW
3185 * lock is matched whereas we asked for PR. */
3186 if (!rqset && einfo->ei_mode != mode)
3187 ldlm_lock_addref(lockh, LCK_PR);
3188 osc_set_lock_data_with_check(matched, einfo, *flags);
3190 /* I would like to be able to ASSERT here that
3191 * rss <= kms, but I can't, for reasons which
3192 * are explained in lov_enqueue() */
3195 /* We already have a lock, and it's referenced */
3196 (*upcall)(cookie, ELDLM_OK);
3198 /* For async requests, decref the lock. */
3199 if (einfo->ei_mode != mode)
3200 ldlm_lock_decref(lockh, LCK_PW);
3202 ldlm_lock_decref(lockh, einfo->ei_mode);
3203 LDLM_LOCK_PUT(matched);
3206 ldlm_lock_decref(lockh, mode);
3207 LDLM_LOCK_PUT(matched);
3212 CFS_LIST_HEAD(cancels);
3213 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3214 &RQF_LDLM_ENQUEUE_LVB);
3218 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3222 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3224 ptlrpc_request_set_replen(req);
3227 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3228 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3230 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3231 sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3234 struct osc_enqueue_args *aa;
3235 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3236 aa = ptlrpc_req_async_args(req);
3239 aa->oa_flags = flags;
3240 aa->oa_upcall = upcall;
3241 aa->oa_cookie = cookie;
3243 aa->oa_lockh = lockh;
3245 req->rq_interpret_reply =
3246 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3247 if (rqset == PTLRPCD_SET)
3248 ptlrpcd_add_req(req, PSCOPE_OTHER);
3250 ptlrpc_set_add_req(rqset, req);
3251 } else if (intent) {
3252 ptlrpc_req_finished(req);
3257 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3259 ptlrpc_req_finished(req);
3264 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3265 struct ldlm_enqueue_info *einfo,
3266 struct ptlrpc_request_set *rqset)
3268 struct ldlm_res_id res_id;
3272 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3273 oinfo->oi_md->lsm_object_gr, &res_id);
3275 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3276 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3277 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3278 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3279 rqset, rqset != NULL);
3283 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3284 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3285 int *flags, void *data, struct lustre_handle *lockh,
3288 struct obd_device *obd = exp->exp_obd;
3289 int lflags = *flags;
3293 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3296 /* Filesystem lock extents are extended to page boundaries so that
3297 * dealing with the page cache is a little smoother */
3298 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3299 policy->l_extent.end |= ~CFS_PAGE_MASK;
3301 /* Next, search for already existing extent locks that will cover us */
3302 /* If we're trying to read, we also search for an existing PW lock. The
3303 * VFS and page cache already protect us locally, so lots of readers/
3304 * writers can share a single PW lock. */
3308 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3309 res_id, type, policy, rc, lockh, unref);
3312 osc_set_data_with_check(lockh, data, lflags);
3313 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3314 ldlm_lock_addref(lockh, LCK_PR);
3315 ldlm_lock_decref(lockh, LCK_PW);
3322 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3326 if (unlikely(mode == LCK_GROUP))
3327 ldlm_lock_decref_and_cancel(lockh, mode);
3329 ldlm_lock_decref(lockh, mode);
3334 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3335 __u32 mode, struct lustre_handle *lockh)
3338 RETURN(osc_cancel_base(lockh, mode));
3341 static int osc_cancel_unused(struct obd_export *exp,
3342 struct lov_stripe_md *lsm, int flags,
3345 struct obd_device *obd = class_exp2obd(exp);
3346 struct ldlm_res_id res_id, *resp = NULL;
3349 resp = osc_build_res_name(lsm->lsm_object_id,
3350 lsm->lsm_object_gr, &res_id);
3353 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3356 static int osc_statfs_interpret(const struct lu_env *env,
3357 struct ptlrpc_request *req,
3358 struct osc_async_args *aa, int rc)
3360 struct obd_statfs *msfs;
3363 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3364 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3370 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3372 GOTO(out, rc = -EPROTO);
3375 *aa->aa_oi->oi_osfs = *msfs;
3377 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3381 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3382 __u64 max_age, struct ptlrpc_request_set *rqset)
3384 struct ptlrpc_request *req;
3385 struct osc_async_args *aa;
3389 /* We could possibly pass max_age in the request (as an absolute
3390 * timestamp or a "seconds.usec ago") so the target can avoid doing
3391 * extra calls into the filesystem if that isn't necessary (e.g.
3392 * during mount that would help a bit). Having relative timestamps
3393 * is not so great if request processing is slow, while absolute
3394 * timestamps are not ideal because they need time synchronization. */
3395 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3399 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3401 ptlrpc_request_free(req);
3404 ptlrpc_request_set_replen(req);
3405 req->rq_request_portal = OST_CREATE_PORTAL;
3406 ptlrpc_at_set_req_timeout(req);
3408 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3409 /* procfs requests not want stat in wait for avoid deadlock */
3410 req->rq_no_resend = 1;
3411 req->rq_no_delay = 1;
3414 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3415 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3416 aa = ptlrpc_req_async_args(req);
3419 ptlrpc_set_add_req(rqset, req);
3423 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3424 __u64 max_age, __u32 flags)
3426 struct obd_statfs *msfs;
3427 struct ptlrpc_request *req;
3428 struct obd_import *imp = NULL;
3432 /*Since the request might also come from lprocfs, so we need
3433 *sync this with client_disconnect_export Bug15684*/
3434 down_read(&obd->u.cli.cl_sem);
3435 if (obd->u.cli.cl_import)
3436 imp = class_import_get(obd->u.cli.cl_import);
3437 up_read(&obd->u.cli.cl_sem);
3441 /* We could possibly pass max_age in the request (as an absolute
3442 * timestamp or a "seconds.usec ago") so the target can avoid doing
3443 * extra calls into the filesystem if that isn't necessary (e.g.
3444 * during mount that would help a bit). Having relative timestamps
3445 * is not so great if request processing is slow, while absolute
3446 * timestamps are not ideal because they need time synchronization. */
3447 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3449 class_import_put(imp);
3454 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3456 ptlrpc_request_free(req);
3459 ptlrpc_request_set_replen(req);
3460 req->rq_request_portal = OST_CREATE_PORTAL;
3461 ptlrpc_at_set_req_timeout(req);
3463 if (flags & OBD_STATFS_NODELAY) {
3464 /* procfs requests not want stat in wait for avoid deadlock */
3465 req->rq_no_resend = 1;
3466 req->rq_no_delay = 1;
3469 rc = ptlrpc_queue_wait(req);
3473 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3475 GOTO(out, rc = -EPROTO);
3482 ptlrpc_req_finished(req);
3486 /* Retrieve object striping information.
3488 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3489 * the maximum number of OST indices which will fit in the user buffer.
3490 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3492 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3494 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3495 struct lov_user_md_v3 lum, *lumk;
3496 struct lov_user_ost_data_v1 *lmm_objects;
3497 int rc = 0, lum_size;
3503 /* we only need the header part from user space to get lmm_magic and
3504 * lmm_stripe_count, (the header part is common to v1 and v3) */
3505 lum_size = sizeof(struct lov_user_md_v1);
3506 if (copy_from_user(&lum, lump, lum_size))
3509 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3510 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3513 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3514 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3515 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3516 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3518 /* we can use lov_mds_md_size() to compute lum_size
3519 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3520 if (lum.lmm_stripe_count > 0) {
3521 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3522 OBD_ALLOC(lumk, lum_size);
3526 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3527 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3529 lmm_objects = &(lumk->lmm_objects[0]);
3530 lmm_objects->l_object_id = lsm->lsm_object_id;
3532 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3536 lumk->lmm_object_id = lsm->lsm_object_id;
3537 lumk->lmm_object_gr = lsm->lsm_object_gr;
3538 lumk->lmm_stripe_count = 1;
3540 if (copy_to_user(lump, lumk, lum_size))
3544 OBD_FREE(lumk, lum_size);
3550 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3551 void *karg, void *uarg)
3553 struct obd_device *obd = exp->exp_obd;
3554 struct obd_ioctl_data *data = karg;
3558 if (!try_module_get(THIS_MODULE)) {
3559 CERROR("Can't get module. Is it alive?");
3563 case OBD_IOC_LOV_GET_CONFIG: {
3565 struct lov_desc *desc;
3566 struct obd_uuid uuid;
3570 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3571 GOTO(out, err = -EINVAL);
3573 data = (struct obd_ioctl_data *)buf;
3575 if (sizeof(*desc) > data->ioc_inllen1) {
3576 obd_ioctl_freedata(buf, len);
3577 GOTO(out, err = -EINVAL);
3580 if (data->ioc_inllen2 < sizeof(uuid)) {
3581 obd_ioctl_freedata(buf, len);
3582 GOTO(out, err = -EINVAL);
3585 desc = (struct lov_desc *)data->ioc_inlbuf1;
3586 desc->ld_tgt_count = 1;
3587 desc->ld_active_tgt_count = 1;
3588 desc->ld_default_stripe_count = 1;
3589 desc->ld_default_stripe_size = 0;
3590 desc->ld_default_stripe_offset = 0;
3591 desc->ld_pattern = 0;
3592 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3594 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3596 err = copy_to_user((void *)uarg, buf, len);
3599 obd_ioctl_freedata(buf, len);
3602 case LL_IOC_LOV_SETSTRIPE:
3603 err = obd_alloc_memmd(exp, karg);
3607 case LL_IOC_LOV_GETSTRIPE:
3608 err = osc_getstripe(karg, uarg);
3610 case OBD_IOC_CLIENT_RECOVER:
3611 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3616 case IOC_OSC_SET_ACTIVE:
3617 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3620 case OBD_IOC_POLL_QUOTACHECK:
3621 err = lquota_poll_check(quota_interface, exp,
3622 (struct if_quotacheck *)karg);
3624 case OBD_IOC_PING_TARGET:
3625 err = ptlrpc_obd_ping(obd);
3628 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3629 cmd, cfs_curproc_comm());
3630 GOTO(out, err = -ENOTTY);
3633 module_put(THIS_MODULE);
3637 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3638 void *key, __u32 *vallen, void *val,
3639 struct lov_stripe_md *lsm)
3642 if (!vallen || !val)
3645 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3646 __u32 *stripe = val;
3647 *vallen = sizeof(*stripe);
3650 } else if (KEY_IS(KEY_LAST_ID)) {
3651 struct ptlrpc_request *req;
3656 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3657 &RQF_OST_GET_INFO_LAST_ID);
3661 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3662 RCL_CLIENT, keylen);
3663 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3665 ptlrpc_request_free(req);
3669 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3670 memcpy(tmp, key, keylen);
3672 ptlrpc_request_set_replen(req);
3673 rc = ptlrpc_queue_wait(req);
3677 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3679 GOTO(out, rc = -EPROTO);
3681 *((obd_id *)val) = *reply;
3683 ptlrpc_req_finished(req);
3685 } else if (KEY_IS(KEY_FIEMAP)) {
3686 struct ptlrpc_request *req;
3687 struct ll_user_fiemap *reply;
3691 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3692 &RQF_OST_GET_INFO_FIEMAP);
3696 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3697 RCL_CLIENT, keylen);
3698 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3699 RCL_CLIENT, *vallen);
3700 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3701 RCL_SERVER, *vallen);
3703 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3705 ptlrpc_request_free(req);
3709 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3710 memcpy(tmp, key, keylen);
3711 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3712 memcpy(tmp, val, *vallen);
3714 ptlrpc_request_set_replen(req);
3715 rc = ptlrpc_queue_wait(req);
3719 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3721 GOTO(out1, rc = -EPROTO);
3723 memcpy(val, reply, *vallen);
3725 ptlrpc_req_finished(req);
3733 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3734 struct ptlrpc_request *req,
3737 struct llog_ctxt *ctxt;
3738 struct obd_import *imp = req->rq_import;
3744 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3747 rc = llog_initiator_connect(ctxt);
3749 CERROR("cannot establish connection for "
3750 "ctxt %p: %d\n", ctxt, rc);
3753 llog_ctxt_put(ctxt);
3754 spin_lock(&imp->imp_lock);
3755 imp->imp_server_timeout = 1;
3756 imp->imp_pingable = 1;
3757 spin_unlock(&imp->imp_lock);
3758 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3763 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3764 void *key, obd_count vallen, void *val,
3765 struct ptlrpc_request_set *set)
3767 struct ptlrpc_request *req;
3768 struct obd_device *obd = exp->exp_obd;
3769 struct obd_import *imp = class_exp2cliimp(exp);
3774 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3776 if (KEY_IS(KEY_NEXT_ID)) {
3777 if (vallen != sizeof(obd_id))
3781 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3782 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3783 exp->exp_obd->obd_name,
3784 obd->u.cli.cl_oscc.oscc_next_id);
3789 if (KEY_IS(KEY_UNLINKED)) {
3790 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3791 spin_lock(&oscc->oscc_lock);
3792 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3793 spin_unlock(&oscc->oscc_lock);
3797 if (KEY_IS(KEY_INIT_RECOV)) {
3798 if (vallen != sizeof(int))
3800 spin_lock(&imp->imp_lock);
3801 imp->imp_initial_recov = *(int *)val;
3802 spin_unlock(&imp->imp_lock);
3803 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3804 exp->exp_obd->obd_name,
3805 imp->imp_initial_recov);
3809 if (KEY_IS(KEY_CHECKSUM)) {
3810 if (vallen != sizeof(int))
3812 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3816 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3817 sptlrpc_conf_client_adapt(obd);
3821 if (KEY_IS(KEY_FLUSH_CTX)) {
3822 sptlrpc_import_flush_my_ctx(imp);
3826 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3829 /* We pass all other commands directly to OST. Since nobody calls osc
3830 methods directly and everybody is supposed to go through LOV, we
3831 assume lov checked invalid values for us.
3832 The only recognised values so far are evict_by_nid and mds_conn.
3833 Even if something bad goes through, we'd get a -EINVAL from OST
3836 if (KEY_IS(KEY_GRANT_SHRINK))
3837 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3839 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3844 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3845 RCL_CLIENT, keylen);
3846 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3847 RCL_CLIENT, vallen);
3848 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3850 ptlrpc_request_free(req);
3854 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3855 memcpy(tmp, key, keylen);
3856 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3857 memcpy(tmp, val, vallen);
3859 if (KEY_IS(KEY_MDS_CONN)) {
3860 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3862 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3863 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3864 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3865 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3866 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3867 struct osc_grant_args *aa;
3870 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3871 aa = ptlrpc_req_async_args(req);
3874 ptlrpc_req_finished(req);
3877 *oa = ((struct ost_body *)val)->oa;
3879 req->rq_interpret_reply = osc_shrink_grant_interpret;
3882 ptlrpc_request_set_replen(req);
3883 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3884 LASSERT(set != NULL);
3885 ptlrpc_set_add_req(set, req);
3886 ptlrpc_check_set(NULL, set);
3888 ptlrpcd_add_req(req, PSCOPE_OTHER);
3894 static struct llog_operations osc_size_repl_logops = {
3895 lop_cancel: llog_obd_repl_cancel
3898 static struct llog_operations osc_mds_ost_orig_logops;
3899 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3900 struct obd_device *tgt, int count,
3901 struct llog_catid *catid, struct obd_uuid *uuid)
3906 LASSERT(olg == &obd->obd_olg);
3907 spin_lock(&obd->obd_dev_lock);
3908 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3909 osc_mds_ost_orig_logops = llog_lvfs_ops;
3910 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3911 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3912 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3913 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3915 spin_unlock(&obd->obd_dev_lock);
3917 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3918 &catid->lci_logid, &osc_mds_ost_orig_logops);
3920 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3924 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3925 NULL, &osc_size_repl_logops);
3927 struct llog_ctxt *ctxt =
3928 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3931 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3936 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3937 obd->obd_name, tgt->obd_name, count, catid, rc);
3938 CERROR("logid "LPX64":0x%x\n",
3939 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3944 static int osc_llog_finish(struct obd_device *obd, int count)
3946 struct llog_ctxt *ctxt;
3947 int rc = 0, rc2 = 0;
3950 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3952 rc = llog_cleanup(ctxt);
3954 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3956 rc2 = llog_cleanup(ctxt);
3963 static int osc_reconnect(const struct lu_env *env,
3964 struct obd_export *exp, struct obd_device *obd,
3965 struct obd_uuid *cluuid,
3966 struct obd_connect_data *data,
3969 struct client_obd *cli = &obd->u.cli;
3971 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3974 client_obd_list_lock(&cli->cl_loi_list_lock);
3975 data->ocd_grant = cli->cl_avail_grant ?:
3976 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3977 lost_grant = cli->cl_lost_grant;
3978 cli->cl_lost_grant = 0;
3979 client_obd_list_unlock(&cli->cl_loi_list_lock);
3981 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3982 "cl_lost_grant: %ld\n", data->ocd_grant,
3983 cli->cl_avail_grant, lost_grant);
3984 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3985 " ocd_grant: %d\n", data->ocd_connect_flags,
3986 data->ocd_version, data->ocd_grant);
3992 static int osc_disconnect(struct obd_export *exp)
3994 struct obd_device *obd = class_exp2obd(exp);
3995 struct llog_ctxt *ctxt;
3998 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4000 if (obd->u.cli.cl_conn_count == 1) {
4001 /* Flush any remaining cancel messages out to the
4003 llog_sync(ctxt, exp);
4005 llog_ctxt_put(ctxt);
4007 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4011 rc = client_disconnect_export(exp);
4013 * Initially we put del_shrink_grant before disconnect_export, but it
4014 * causes the following problem if setup (connect) and cleanup
4015 * (disconnect) are tangled together.
4016 * connect p1 disconnect p2
4017 * ptlrpc_connect_import
4018 * ............... class_manual_cleanup
4021 * ptlrpc_connect_interrupt
4023 * add this client to shrink list
4025 * Bang! pinger trigger the shrink.
4026 * So the osc should be disconnected from the shrink list, after we
4027 * are sure the import has been destroyed. BUG18662
4029 if (obd->u.cli.cl_import == NULL)
4030 osc_del_shrink_grant(&obd->u.cli);
4034 static int osc_import_event(struct obd_device *obd,
4035 struct obd_import *imp,
4036 enum obd_import_event event)
4038 struct client_obd *cli;
4042 LASSERT(imp->imp_obd == obd);
4045 case IMP_EVENT_DISCON: {
4046 /* Only do this on the MDS OSC's */
4047 if (imp->imp_server_timeout) {
4048 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4050 spin_lock(&oscc->oscc_lock);
4051 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4052 spin_unlock(&oscc->oscc_lock);
4055 client_obd_list_lock(&cli->cl_loi_list_lock);
4056 cli->cl_avail_grant = 0;
4057 cli->cl_lost_grant = 0;
4058 client_obd_list_unlock(&cli->cl_loi_list_lock);
4061 case IMP_EVENT_INACTIVE: {
4062 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4065 case IMP_EVENT_INVALIDATE: {
4066 struct ldlm_namespace *ns = obd->obd_namespace;
4070 env = cl_env_get(&refcheck);
4074 client_obd_list_lock(&cli->cl_loi_list_lock);
4075 /* all pages go to failing rpcs due to the invalid
4077 osc_check_rpcs(env, cli);
4078 client_obd_list_unlock(&cli->cl_loi_list_lock);
4080 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4081 cl_env_put(env, &refcheck);
4086 case IMP_EVENT_ACTIVE: {
4087 /* Only do this on the MDS OSC's */
4088 if (imp->imp_server_timeout) {
4089 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4091 spin_lock(&oscc->oscc_lock);
4092 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4093 spin_unlock(&oscc->oscc_lock);
4095 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4098 case IMP_EVENT_OCD: {
4099 struct obd_connect_data *ocd = &imp->imp_connect_data;
4101 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4102 osc_init_grant(&obd->u.cli, ocd);
4105 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4106 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4108 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4112 CERROR("Unknown import event %d\n", event);
4118 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4124 rc = ptlrpcd_addref();
4128 rc = client_obd_setup(obd, lcfg);
4132 struct lprocfs_static_vars lvars = { 0 };
4133 struct client_obd *cli = &obd->u.cli;
4135 lprocfs_osc_init_vars(&lvars);
4136 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4137 lproc_osc_attach_seqstat(obd);
4138 sptlrpc_lprocfs_cliobd_attach(obd);
4139 ptlrpc_lprocfs_register_obd(obd);
4143 /* We need to allocate a few requests more, because
4144 brw_interpret tries to create new requests before freeing
4145 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4146 reserved, but I afraid that might be too much wasted RAM
4147 in fact, so 2 is just my guess and still should work. */
4148 cli->cl_import->imp_rq_pool =
4149 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4151 ptlrpc_add_rqs_to_pool);
4153 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4154 sema_init(&cli->cl_grant_sem, 1);
4160 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4166 case OBD_CLEANUP_EARLY: {
4167 struct obd_import *imp;
4168 imp = obd->u.cli.cl_import;
4169 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4170 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4171 ptlrpc_deactivate_import(imp);
4172 spin_lock(&imp->imp_lock);
4173 imp->imp_pingable = 0;
4174 spin_unlock(&imp->imp_lock);
4177 case OBD_CLEANUP_EXPORTS: {
4178 /* If we set up but never connected, the
4179 client import will not have been cleaned. */
4180 if (obd->u.cli.cl_import) {
4181 struct obd_import *imp;
4182 down_write(&obd->u.cli.cl_sem);
4183 imp = obd->u.cli.cl_import;
4184 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4186 ptlrpc_invalidate_import(imp);
4187 if (imp->imp_rq_pool) {
4188 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4189 imp->imp_rq_pool = NULL;
4191 class_destroy_import(imp);
4192 up_write(&obd->u.cli.cl_sem);
4193 obd->u.cli.cl_import = NULL;
4195 rc = obd_llog_finish(obd, 0);
4197 CERROR("failed to cleanup llogging subsystems\n");
4204 int osc_cleanup(struct obd_device *obd)
4206 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4210 ptlrpc_lprocfs_unregister_obd(obd);
4211 lprocfs_obd_cleanup(obd);
4213 spin_lock(&oscc->oscc_lock);
4214 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4215 oscc->oscc_flags |= OSCC_FLAG_EXITING;
4216 spin_unlock(&oscc->oscc_lock);
4218 /* free memory of osc quota cache */
4219 lquota_cleanup(quota_interface, obd);
4221 rc = client_obd_cleanup(obd);
4227 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4229 struct lprocfs_static_vars lvars = { 0 };
4232 lprocfs_osc_init_vars(&lvars);
4234 switch (lcfg->lcfg_command) {
4236 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4246 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4248 return osc_process_config_base(obd, buf);
4251 struct obd_ops osc_obd_ops = {
4252 .o_owner = THIS_MODULE,
4253 .o_setup = osc_setup,
4254 .o_precleanup = osc_precleanup,
4255 .o_cleanup = osc_cleanup,
4256 .o_add_conn = client_import_add_conn,
4257 .o_del_conn = client_import_del_conn,
4258 .o_connect = client_connect_import,
4259 .o_reconnect = osc_reconnect,
4260 .o_disconnect = osc_disconnect,
4261 .o_statfs = osc_statfs,
4262 .o_statfs_async = osc_statfs_async,
4263 .o_packmd = osc_packmd,
4264 .o_unpackmd = osc_unpackmd,
4265 .o_precreate = osc_precreate,
4266 .o_create = osc_create,
4267 .o_destroy = osc_destroy,
4268 .o_getattr = osc_getattr,
4269 .o_getattr_async = osc_getattr_async,
4270 .o_setattr = osc_setattr,
4271 .o_setattr_async = osc_setattr_async,
4273 .o_punch = osc_punch,
4275 .o_enqueue = osc_enqueue,
4276 .o_change_cbdata = osc_change_cbdata,
4277 .o_cancel = osc_cancel,
4278 .o_cancel_unused = osc_cancel_unused,
4279 .o_iocontrol = osc_iocontrol,
4280 .o_get_info = osc_get_info,
4281 .o_set_info_async = osc_set_info_async,
4282 .o_import_event = osc_import_event,
4283 .o_llog_init = osc_llog_init,
4284 .o_llog_finish = osc_llog_finish,
4285 .o_process_config = osc_process_config,
4288 extern struct lu_kmem_descr osc_caches[];
4289 extern spinlock_t osc_ast_guard;
4290 extern struct lock_class_key osc_ast_guard_class;
4292 int __init osc_init(void)
4294 struct lprocfs_static_vars lvars = { 0 };
4298 /* print an address of _any_ initialized kernel symbol from this
4299 * module, to allow debugging with gdb that doesn't support data
4300 * symbols from modules.*/
4301 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4303 rc = lu_kmem_init(osc_caches);
4305 lprocfs_osc_init_vars(&lvars);
4307 request_module("lquota");
4308 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4309 lquota_init(quota_interface);
4310 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4312 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4313 LUSTRE_OSC_NAME, &osc_device_type);
4315 if (quota_interface)
4316 PORTAL_SYMBOL_PUT(osc_quota_interface);
4317 lu_kmem_fini(osc_caches);
4321 spin_lock_init(&osc_ast_guard);
4322 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4328 static void /*__exit*/ osc_exit(void)
4330 lu_device_type_fini(&osc_device_type);
4332 lquota_exit(quota_interface);
4333 if (quota_interface)
4334 PORTAL_SYMBOL_PUT(osc_quota_interface);
4336 class_unregister_type(LUSTRE_OSC_NAME);
4337 lu_kmem_fini(osc_caches);
4340 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4341 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4342 MODULE_LICENSE("GPL");
4344 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);