1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214 lustre_swab_ost_body);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308 struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324 ptlrpc_request_free(req);
328 osc_pack_req_body(req, oinfo);
330 ptlrpc_request_set_replen(req);
332 rc = ptlrpc_queue_wait(req);
336 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338 GOTO(out, rc = -EPROTO);
340 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344 ptlrpc_req_finished(req);
348 static int osc_setattr_interpret(const struct lu_env *env,
349 struct ptlrpc_request *req,
350 struct osc_async_args *aa, int rc)
352 struct ost_body *body;
358 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360 GOTO(out, rc = -EPROTO);
362 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
364 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 struct ptlrpc_request_set *rqset)
372 struct ptlrpc_request *req;
373 struct osc_async_args *aa;
377 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 ptlrpc_request_free(req);
388 osc_pack_req_body(req, oinfo);
390 ptlrpc_request_set_replen(req);
392 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
394 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
397 /* do mds to ost setattr asynchronously */
399 /* Do not wait for response. */
400 ptlrpcd_add_req(req, PSCOPE_OTHER);
402 req->rq_interpret_reply =
403 (ptlrpc_interpterer_t)osc_setattr_interpret;
405 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
406 aa = ptlrpc_req_async_args(req);
409 ptlrpc_set_add_req(rqset, req);
415 int osc_real_create(struct obd_export *exp, struct obdo *oa,
416 struct lov_stripe_md **ea, struct obd_trans_info *oti)
418 struct ptlrpc_request *req;
419 struct ost_body *body;
420 struct lov_stripe_md *lsm;
429 rc = obd_alloc_memmd(exp, &lsm);
434 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
436 GOTO(out, rc = -ENOMEM);
438 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
440 ptlrpc_request_free(req);
444 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
446 lustre_set_wire_obdo(&body->oa, oa);
448 ptlrpc_request_set_replen(req);
450 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
451 oa->o_flags == OBD_FL_DELORPHAN) {
453 "delorphan from OST integration");
454 /* Don't resend the delorphan req */
455 req->rq_no_resend = req->rq_no_delay = 1;
458 rc = ptlrpc_queue_wait(req);
462 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
464 GOTO(out_req, rc = -EPROTO);
466 lustre_get_wire_obdo(oa, &body->oa);
468 /* This should really be sent by the OST */
469 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
470 oa->o_valid |= OBD_MD_FLBLKSZ;
472 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
473 * have valid lsm_oinfo data structs, so don't go touching that.
474 * This needs to be fixed in a big way.
476 lsm->lsm_object_id = oa->o_id;
477 lsm->lsm_object_gr = oa->o_gr;
481 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
483 if (oa->o_valid & OBD_MD_FLCOOKIE) {
484 if (!oti->oti_logcookies)
485 oti_alloc_cookies(oti, 1);
486 *oti->oti_logcookies = oa->o_lcookie;
490 CDEBUG(D_HA, "transno: "LPD64"\n",
491 lustre_msg_get_transno(req->rq_repmsg));
493 ptlrpc_req_finished(req);
496 obd_free_memmd(exp, &lsm);
500 static int osc_punch_interpret(const struct lu_env *env,
501 struct ptlrpc_request *req,
502 struct osc_punch_args *aa, int rc)
504 struct ost_body *body;
510 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
512 GOTO(out, rc = -EPROTO);
514 lustre_get_wire_obdo(aa->pa_oa, &body->oa);
516 rc = aa->pa_upcall(aa->pa_cookie, rc);
520 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
521 struct obd_capa *capa,
522 obd_enqueue_update_f upcall, void *cookie,
523 struct ptlrpc_request_set *rqset)
525 struct ptlrpc_request *req;
526 struct osc_punch_args *aa;
527 struct ost_body *body;
531 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535 osc_set_capa_size(req, &RMF_CAPA1, capa);
536 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
538 ptlrpc_request_free(req);
541 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542 ptlrpc_at_set_req_timeout(req);
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 lustre_set_wire_obdo(&body->oa, oa);
547 osc_pack_capa(req, body, capa);
549 ptlrpc_request_set_replen(req);
552 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
553 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
554 aa = ptlrpc_req_async_args(req);
556 aa->pa_upcall = upcall;
557 aa->pa_cookie = cookie;
558 if (rqset == PTLRPCD_SET)
559 ptlrpcd_add_req(req, PSCOPE_OTHER);
561 ptlrpc_set_add_req(rqset, req);
566 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
567 struct obd_trans_info *oti,
568 struct ptlrpc_request_set *rqset)
570 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
571 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
572 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
573 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
574 oinfo->oi_cb_up, oinfo, rqset);
577 static int osc_sync(struct obd_export *exp, struct obdo *oa,
578 struct lov_stripe_md *md, obd_size start, obd_size end,
581 struct ptlrpc_request *req;
582 struct ost_body *body;
587 CDEBUG(D_INFO, "oa NULL\n");
591 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595 osc_set_capa_size(req, &RMF_CAPA1, capa);
596 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
598 ptlrpc_request_free(req);
602 /* overload the size and blocks fields in the oa with start/end */
603 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
605 lustre_set_wire_obdo(&body->oa, oa);
606 body->oa.o_size = start;
607 body->oa.o_blocks = end;
608 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
609 osc_pack_capa(req, body, capa);
611 ptlrpc_request_set_replen(req);
613 rc = ptlrpc_queue_wait(req);
617 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
619 GOTO(out, rc = -EPROTO);
621 lustre_get_wire_obdo(oa, &body->oa);
625 ptlrpc_req_finished(req);
629 /* Find and cancel locally locks matched by @mode in the resource found by
630 * @objid. Found locks are added into @cancel list. Returns the amount of
631 * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633 struct list_head *cancels, ldlm_mode_t mode,
636 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637 struct ldlm_res_id res_id;
638 struct ldlm_resource *res;
642 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
643 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647 LDLM_RESOURCE_ADDREF(res);
648 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
649 lock_flags, 0, NULL);
650 LDLM_RESOURCE_DELREF(res);
651 ldlm_resource_putref(res);
655 static int osc_destroy_interpret(const struct lu_env *env,
656 struct ptlrpc_request *req, void *data,
659 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
661 atomic_dec(&cli->cl_destroy_in_flight);
662 cfs_waitq_signal(&cli->cl_destroy_waitq);
666 static int osc_can_send_destroy(struct client_obd *cli)
668 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
669 cli->cl_max_rpcs_in_flight) {
670 /* The destroy request can be sent */
673 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
674 cli->cl_max_rpcs_in_flight) {
676 * The counter has been modified between the two atomic
679 cfs_waitq_signal(&cli->cl_destroy_waitq);
684 /* Destroy requests can be async always on the client, and we don't even really
685 * care about the return code since the client cannot do anything at all about
687 * When the MDS is unlinking a filename, it saves the file objects into a
688 * recovery llog, and these object records are cancelled when the OST reports
689 * they were destroyed and sync'd to disk (i.e. transaction committed).
690 * If the client dies, or the OST is down when the object should be destroyed,
691 * the records are not cancelled, and when the OST reconnects to the MDS next,
692 * it will retrieve the llog unlink logs and then sends the log cancellation
693 * cookies to the MDS after committing destroy transactions. */
694 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
695 struct lov_stripe_md *ea, struct obd_trans_info *oti,
696 struct obd_export *md_export, void *capa)
698 struct client_obd *cli = &exp->exp_obd->u.cli;
699 struct ptlrpc_request *req;
700 struct ost_body *body;
701 CFS_LIST_HEAD(cancels);
706 CDEBUG(D_INFO, "oa NULL\n");
710 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
711 LDLM_FL_DISCARD_DATA);
713 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
715 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
720 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
723 ptlrpc_request_free(req);
727 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
728 ptlrpc_at_set_req_timeout(req);
730 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
731 oa->o_lcookie = *oti->oti_logcookies;
732 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
734 lustre_set_wire_obdo(&body->oa, oa);
736 osc_pack_capa(req, body, (struct obd_capa *)capa);
737 ptlrpc_request_set_replen(req);
739 /* don't throttle destroy RPCs for the MDT */
740 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
741 req->rq_interpret_reply = osc_destroy_interpret;
742 if (!osc_can_send_destroy(cli)) {
743 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
747 * Wait until the number of on-going destroy RPCs drops
748 * under max_rpc_in_flight
750 l_wait_event_exclusive(cli->cl_destroy_waitq,
751 osc_can_send_destroy(cli), &lwi);
755 /* Do not wait for response */
756 ptlrpcd_add_req(req, PSCOPE_OTHER);
760 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
765 LASSERT(!(oa->o_valid & bits));
768 client_obd_list_lock(&cli->cl_loi_list_lock);
769 oa->o_dirty = cli->cl_dirty;
770 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
771 CERROR("dirty %lu - %lu > dirty_max %lu\n",
772 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
774 } else if (atomic_read(&obd_dirty_pages) -
775 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
776 CERROR("dirty %d - %d > system dirty_max %d\n",
777 atomic_read(&obd_dirty_pages),
778 atomic_read(&obd_dirty_transit_pages),
779 obd_max_dirty_pages);
781 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
782 CERROR("dirty %lu - dirty_max %lu too big???\n",
783 cli->cl_dirty, cli->cl_dirty_max);
786 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
787 (cli->cl_max_rpcs_in_flight + 1);
788 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
790 oa->o_grant = cli->cl_avail_grant;
791 oa->o_dropped = cli->cl_lost_grant;
792 cli->cl_lost_grant = 0;
793 client_obd_list_unlock(&cli->cl_loi_list_lock);
794 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
795 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
799 static void osc_update_next_shrink(struct client_obd *cli)
801 cli->cl_next_shrink_grant =
802 cfs_time_shift(cli->cl_grant_shrink_interval);
803 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
804 cli->cl_next_shrink_grant);
807 /* caller must hold loi_list_lock */
808 static void osc_consume_write_grant(struct client_obd *cli,
809 struct brw_page *pga)
811 LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
812 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
813 atomic_inc(&obd_dirty_pages);
814 cli->cl_dirty += CFS_PAGE_SIZE;
815 cli->cl_avail_grant -= CFS_PAGE_SIZE;
816 pga->flag |= OBD_BRW_FROM_GRANT;
817 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
818 CFS_PAGE_SIZE, pga, pga->pg);
819 LASSERT(cli->cl_avail_grant >= 0);
820 osc_update_next_shrink(cli);
823 /* the companion to osc_consume_write_grant, called when a brw has completed.
824 * must be called with the loi lock held. */
825 static void osc_release_write_grant(struct client_obd *cli,
826 struct brw_page *pga, int sent)
828 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
831 LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
832 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
837 pga->flag &= ~OBD_BRW_FROM_GRANT;
838 atomic_dec(&obd_dirty_pages);
839 cli->cl_dirty -= CFS_PAGE_SIZE;
840 if (pga->flag & OBD_BRW_NOCACHE) {
841 pga->flag &= ~OBD_BRW_NOCACHE;
842 atomic_dec(&obd_dirty_transit_pages);
843 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
846 cli->cl_lost_grant += CFS_PAGE_SIZE;
847 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850 /* For short writes we shouldn't count parts of pages that
851 * span a whole block on the OST side, or our accounting goes
852 * wrong. Should match the code in filter_grant_check. */
853 int offset = pga->off & ~CFS_PAGE_MASK;
854 int count = pga->count + (offset & (blocksize - 1));
855 int end = (offset + pga->count) & (blocksize - 1);
857 count += blocksize - end;
859 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862 cli->cl_avail_grant, cli->cl_dirty);
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
870 return cli->cl_r_in_flight + cli->cl_w_in_flight;
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
876 struct list_head *l, *tmp;
877 struct osc_cache_waiter *ocw;
880 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881 /* if we can't dirty more, we must wait until some is written */
882 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885 "osc max %ld, sys max %d\n", cli->cl_dirty,
886 cli->cl_dirty_max, obd_max_dirty_pages);
890 /* if still dirty cache but no grant wait for pending RPCs that
891 * may yet return us some grant before doing sync writes */
892 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894 cli->cl_w_in_flight);
898 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899 list_del_init(&ocw->ocw_entry);
900 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901 /* no more RPCs in flight to return grant, do sync IO */
902 ocw->ocw_rc = -EDQUOT;
903 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
905 osc_consume_write_grant(cli,
906 &ocw->ocw_oap->oap_brw_page);
909 cfs_waitq_signal(&ocw->ocw_waitq);
915 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
917 client_obd_list_lock(&cli->cl_loi_list_lock);
918 cli->cl_avail_grant += grant;
919 client_obd_list_unlock(&cli->cl_loi_list_lock);
922 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
924 if (body->oa.o_valid & OBD_MD_FLGRANT) {
925 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
926 __osc_update_grant(cli, body->oa.o_grant);
930 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
931 void *key, obd_count vallen, void *val,
932 struct ptlrpc_request_set *set);
934 static int osc_shrink_grant_interpret(const struct lu_env *env,
935 struct ptlrpc_request *req,
938 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
939 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
940 struct ost_body *body;
943 __osc_update_grant(cli, oa->o_grant);
947 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
949 osc_update_grant(cli, body);
955 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
957 client_obd_list_lock(&cli->cl_loi_list_lock);
958 oa->o_grant = cli->cl_avail_grant / 4;
959 cli->cl_avail_grant -= oa->o_grant;
960 client_obd_list_unlock(&cli->cl_loi_list_lock);
961 oa->o_flags |= OBD_FL_SHRINK_GRANT;
962 osc_update_next_shrink(cli);
965 /* Shrink the current grant, either from some large amount to enough for a
966 * full set of in-flight RPCs, or if we have already shrunk to that limit
967 * then to enough for a single RPC. This avoids keeping more grant than
968 * needed, and avoids shrinking the grant piecemeal. */
969 static int osc_shrink_grant(struct client_obd *cli)
971 long target = (cli->cl_max_rpcs_in_flight + 1) *
972 cli->cl_max_pages_per_rpc;
974 client_obd_list_lock(&cli->cl_loi_list_lock);
975 if (cli->cl_avail_grant <= target)
976 target = cli->cl_max_pages_per_rpc;
977 client_obd_list_unlock(&cli->cl_loi_list_lock);
979 return osc_shrink_grant_to_target(cli, target);
982 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
985 struct ost_body *body;
988 client_obd_list_lock(&cli->cl_loi_list_lock);
989 /* Don't shrink if we are already above or below the desired limit
990 * We don't want to shrink below a single RPC, as that will negatively
991 * impact block allocation and long-term performance. */
992 if (target < cli->cl_max_pages_per_rpc)
993 target = cli->cl_max_pages_per_rpc;
995 if (target >= cli->cl_avail_grant) {
996 client_obd_list_unlock(&cli->cl_loi_list_lock);
999 client_obd_list_unlock(&cli->cl_loi_list_lock);
1001 OBD_ALLOC_PTR(body);
1005 osc_announce_cached(cli, &body->oa, 0);
1007 client_obd_list_lock(&cli->cl_loi_list_lock);
1008 body->oa.o_grant = cli->cl_avail_grant - target;
1009 cli->cl_avail_grant = target;
1010 client_obd_list_unlock(&cli->cl_loi_list_lock);
1011 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1012 osc_update_next_shrink(cli);
1014 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1015 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1016 sizeof(*body), body, NULL);
1018 __osc_update_grant(cli, body->oa.o_grant);
1023 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1024 static int osc_should_shrink_grant(struct client_obd *client)
1026 cfs_time_t time = cfs_time_current();
1027 cfs_time_t next_shrink = client->cl_next_shrink_grant;
1028 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1029 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1030 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1033 osc_update_next_shrink(client);
1038 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1040 struct client_obd *client;
1042 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1043 if (osc_should_shrink_grant(client))
1044 osc_shrink_grant(client);
1049 static int osc_add_shrink_grant(struct client_obd *client)
1053 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1055 osc_grant_shrink_grant_cb, NULL,
1056 &client->cl_grant_shrink_list);
1058 CERROR("add grant client %s error %d\n",
1059 client->cl_import->imp_obd->obd_name, rc);
1062 CDEBUG(D_CACHE, "add grant client %s \n",
1063 client->cl_import->imp_obd->obd_name);
1064 osc_update_next_shrink(client);
1068 static int osc_del_shrink_grant(struct client_obd *client)
1070 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1074 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1076 client_obd_list_lock(&cli->cl_loi_list_lock);
1077 cli->cl_avail_grant = ocd->ocd_grant;
1078 client_obd_list_unlock(&cli->cl_loi_list_lock);
1080 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1081 list_empty(&cli->cl_grant_shrink_list))
1082 osc_add_shrink_grant(cli);
1084 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1085 cli->cl_avail_grant, cli->cl_lost_grant);
1086 LASSERT(cli->cl_avail_grant >= 0);
1089 /* We assume that the reason this OSC got a short read is because it read
1090 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1091 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1092 * this stripe never got written at or beyond this stripe offset yet. */
1093 static void handle_short_read(int nob_read, obd_count page_count,
1094 struct brw_page **pga)
1099 /* skip bytes read OK */
1100 while (nob_read > 0) {
1101 LASSERT (page_count > 0);
1103 if (pga[i]->count > nob_read) {
1104 /* EOF inside this page */
1105 ptr = cfs_kmap(pga[i]->pg) +
1106 (pga[i]->off & ~CFS_PAGE_MASK);
1107 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1108 cfs_kunmap(pga[i]->pg);
1114 nob_read -= pga[i]->count;
1119 /* zero remaining pages */
1120 while (page_count-- > 0) {
1121 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1122 memset(ptr, 0, pga[i]->count);
1123 cfs_kunmap(pga[i]->pg);
1128 static int check_write_rcs(struct ptlrpc_request *req,
1129 int requested_nob, int niocount,
1130 obd_count page_count, struct brw_page **pga)
1134 /* return error if any niobuf was in error */
1135 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1136 sizeof(*remote_rcs) * niocount, NULL);
1137 if (remote_rcs == NULL) {
1138 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1141 if (lustre_msg_swabbed(req->rq_repmsg))
1142 for (i = 0; i < niocount; i++)
1143 __swab32s(&remote_rcs[i]);
1145 for (i = 0; i < niocount; i++) {
1146 if (remote_rcs[i] < 0)
1147 return(remote_rcs[i]);
1149 if (remote_rcs[i] != 0) {
1150 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151 i, remote_rcs[i], req);
1156 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158 req->rq_bulk->bd_nob_transferred, requested_nob);
1165 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1167 if (p1->flag != p2->flag) {
1168 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1169 OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1171 /* warn if we try to combine flags that we don't know to be
1172 * safe to combine */
1173 if ((p1->flag & mask) != (p2->flag & mask))
1174 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1175 "same brw?\n", p1->flag, p2->flag);
1179 return (p1->off + p1->count == p2->off);
1182 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1183 struct brw_page **pga, int opc,
1184 cksum_type_t cksum_type)
1189 LASSERT (pg_count > 0);
1190 cksum = init_checksum(cksum_type);
1191 while (nob > 0 && pg_count > 0) {
1192 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1193 int off = pga[i]->off & ~CFS_PAGE_MASK;
1194 int count = pga[i]->count > nob ? nob : pga[i]->count;
1196 /* corrupt the data before we compute the checksum, to
1197 * simulate an OST->client data error */
1198 if (i == 0 && opc == OST_READ &&
1199 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1200 memcpy(ptr + off, "bad1", min(4, nob));
1201 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1202 cfs_kunmap(pga[i]->pg);
1203 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1206 nob -= pga[i]->count;
1210 /* For sending we only compute the wrong checksum instead
1211 * of corrupting the data so it is still correct on a redo */
1212 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1218 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1219 struct lov_stripe_md *lsm, obd_count page_count,
1220 struct brw_page **pga,
1221 struct ptlrpc_request **reqp,
1222 struct obd_capa *ocapa, int reserve)
1224 struct ptlrpc_request *req;
1225 struct ptlrpc_bulk_desc *desc;
1226 struct ost_body *body;
1227 struct obd_ioobj *ioobj;
1228 struct niobuf_remote *niobuf;
1229 int niocount, i, requested_nob, opc, rc;
1230 struct osc_brw_async_args *aa;
1231 struct req_capsule *pill;
1232 struct brw_page *pg_prev;
1235 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1236 RETURN(-ENOMEM); /* Recoverable */
1237 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1238 RETURN(-EINVAL); /* Fatal */
1240 if ((cmd & OBD_BRW_WRITE) != 0) {
1242 req = ptlrpc_request_alloc_pool(cli->cl_import,
1243 cli->cl_import->imp_rq_pool,
1247 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1252 for (niocount = i = 1; i < page_count; i++) {
1253 if (!can_merge_pages(pga[i - 1], pga[i]))
1257 pill = &req->rq_pill;
1258 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1259 niocount * sizeof(*niobuf));
1260 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1262 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1264 ptlrpc_request_free(req);
1267 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1268 ptlrpc_at_set_req_timeout(req);
1270 if (opc == OST_WRITE)
1271 desc = ptlrpc_prep_bulk_imp(req, page_count,
1272 BULK_GET_SOURCE, OST_BULK_PORTAL);
1274 desc = ptlrpc_prep_bulk_imp(req, page_count,
1275 BULK_PUT_SINK, OST_BULK_PORTAL);
1278 GOTO(out, rc = -ENOMEM);
1279 /* NB request now owns desc and will free it when it gets freed */
1281 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1282 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1283 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1284 LASSERT(body && ioobj && niobuf);
1286 lustre_set_wire_obdo(&body->oa, oa);
1288 obdo_to_ioobj(oa, ioobj);
1289 ioobj->ioo_bufcnt = niocount;
1290 osc_pack_capa(req, body, ocapa);
1291 LASSERT (page_count > 0);
1293 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1294 struct brw_page *pg = pga[i];
1296 LASSERT(pg->count > 0);
1297 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1298 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1299 pg->off, pg->count);
1301 LASSERTF(i == 0 || pg->off > pg_prev->off,
1302 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1303 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1305 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1306 pg_prev->pg, page_private(pg_prev->pg),
1307 pg_prev->pg->index, pg_prev->off);
1309 LASSERTF(i == 0 || pg->off > pg_prev->off,
1310 "i %d p_c %u\n", i, page_count);
1312 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1313 (pg->flag & OBD_BRW_SRVLOCK));
1315 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1317 requested_nob += pg->count;
1319 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1321 niobuf->len += pg->count;
1323 niobuf->offset = pg->off;
1324 niobuf->len = pg->count;
1325 niobuf->flags = pg->flag;
1330 LASSERTF((void *)(niobuf - niocount) ==
1331 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1332 niocount * sizeof(*niobuf)),
1333 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1334 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1335 (void *)(niobuf - niocount));
1337 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1338 if (osc_should_shrink_grant(cli))
1339 osc_shrink_grant_local(cli, &body->oa);
1341 /* size[REQ_REC_OFF] still sizeof (*body) */
1342 if (opc == OST_WRITE) {
1343 if (unlikely(cli->cl_checksum) &&
1344 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1345 /* store cl_cksum_type in a local variable since
1346 * it can be changed via lprocfs */
1347 cksum_type_t cksum_type = cli->cl_cksum_type;
1349 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1350 oa->o_flags &= OBD_FL_LOCAL_MASK;
1351 body->oa.o_flags = 0;
1353 body->oa.o_flags |= cksum_type_pack(cksum_type);
1354 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1355 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1359 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1361 /* save this in 'oa', too, for later checking */
1362 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1363 oa->o_flags |= cksum_type_pack(cksum_type);
1365 /* clear out the checksum flag, in case this is a
1366 * resend but cl_checksum is no longer set. b=11238 */
1367 oa->o_valid &= ~OBD_MD_FLCKSUM;
1369 oa->o_cksum = body->oa.o_cksum;
1370 /* 1 RC per niobuf */
1371 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1372 sizeof(__u32) * niocount);
1374 if (unlikely(cli->cl_checksum) &&
1375 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1376 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1377 body->oa.o_flags = 0;
1378 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1379 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1382 /* 1 RC for the whole I/O */
1384 ptlrpc_request_set_replen(req);
1386 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1387 aa = ptlrpc_req_async_args(req);
1389 aa->aa_requested_nob = requested_nob;
1390 aa->aa_nio_count = niocount;
1391 aa->aa_page_count = page_count;
1395 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1396 if (ocapa && reserve)
1397 aa->aa_ocapa = capa_get(ocapa);
1403 ptlrpc_req_finished(req);
1407 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1408 __u32 client_cksum, __u32 server_cksum, int nob,
1409 obd_count page_count, struct brw_page **pga,
1410 cksum_type_t client_cksum_type)
1414 cksum_type_t cksum_type;
1416 if (server_cksum == client_cksum) {
1417 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1421 if (oa->o_valid & OBD_MD_FLFLAGS)
1422 cksum_type = cksum_type_unpack(oa->o_flags);
1424 cksum_type = OBD_CKSUM_CRC32;
1426 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1429 if (cksum_type != client_cksum_type)
1430 msg = "the server did not use the checksum type specified in "
1431 "the original request - likely a protocol problem";
1432 else if (new_cksum == server_cksum)
1433 msg = "changed on the client after we checksummed it - "
1434 "likely false positive due to mmap IO (bug 11742)";
1435 else if (new_cksum == client_cksum)
1436 msg = "changed in transit before arrival at OST";
1438 msg = "changed in transit AND doesn't match the original - "
1439 "likely false positive due to mmap IO (bug 11742)";
1441 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1442 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1443 "["LPU64"-"LPU64"]\n",
1444 msg, libcfs_nid2str(peer->nid),
1445 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1446 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1449 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1451 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1452 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1453 "client csum now %x\n", client_cksum, client_cksum_type,
1454 server_cksum, cksum_type, new_cksum);
1458 /* Note rc enters this function as number of bytes transferred */
1459 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1461 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1462 const lnet_process_id_t *peer =
1463 &req->rq_import->imp_connection->c_peer;
1464 struct client_obd *cli = aa->aa_cli;
1465 struct ost_body *body;
1466 __u32 client_cksum = 0;
1469 if (rc < 0 && rc != -EDQUOT)
1472 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1473 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1474 lustre_swab_ost_body);
1476 CDEBUG(D_INFO, "Can't unpack body\n");
1480 /* set/clear over quota flag for a uid/gid */
1481 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1482 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1483 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1485 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1492 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1493 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1495 osc_update_grant(cli, body);
1497 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1499 CERROR("Unexpected +ve rc %d\n", rc);
1502 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1504 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1507 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1508 check_write_checksum(&body->oa, peer, client_cksum,
1509 body->oa.o_cksum, aa->aa_requested_nob,
1510 aa->aa_page_count, aa->aa_ppga,
1511 cksum_type_unpack(aa->aa_oa->o_flags)))
1514 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1515 aa->aa_page_count, aa->aa_ppga);
1519 /* The rest of this function executes only for OST_READs */
1521 /* if unwrap_bulk failed, return -EAGAIN to retry */
1522 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1524 GOTO(out, rc = -EAGAIN);
1526 if (rc > aa->aa_requested_nob) {
1527 CERROR("Unexpected rc %d (%d requested)\n", rc,
1528 aa->aa_requested_nob);
1532 if (rc != req->rq_bulk->bd_nob_transferred) {
1533 CERROR ("Unexpected rc %d (%d transferred)\n",
1534 rc, req->rq_bulk->bd_nob_transferred);
1538 if (rc < aa->aa_requested_nob)
1539 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1541 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1542 static int cksum_counter;
1543 __u32 server_cksum = body->oa.o_cksum;
1546 cksum_type_t cksum_type;
1548 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1549 cksum_type = cksum_type_unpack(body->oa.o_flags);
1551 cksum_type = OBD_CKSUM_CRC32;
1552 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1553 aa->aa_ppga, OST_READ,
1556 if (peer->nid == req->rq_bulk->bd_sender) {
1560 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1563 if (server_cksum == ~0 && rc > 0) {
1564 CERROR("Protocol error: server %s set the 'checksum' "
1565 "bit, but didn't send a checksum. Not fatal, "
1566 "but please notify on http://bugzilla.lustre.org/\n",
1567 libcfs_nid2str(peer->nid));
1568 } else if (server_cksum != client_cksum) {
1569 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1570 "%s%s%s inum "LPU64"/"LPU64" object "
1571 LPU64"/"LPU64" extent "
1572 "["LPU64"-"LPU64"]\n",
1573 req->rq_import->imp_obd->obd_name,
1574 libcfs_nid2str(peer->nid),
1576 body->oa.o_valid & OBD_MD_FLFID ?
1577 body->oa.o_fid : (__u64)0,
1578 body->oa.o_valid & OBD_MD_FLFID ?
1579 body->oa.o_generation :(__u64)0,
1581 body->oa.o_valid & OBD_MD_FLGROUP ?
1582 body->oa.o_gr : (__u64)0,
1583 aa->aa_ppga[0]->off,
1584 aa->aa_ppga[aa->aa_page_count-1]->off +
1585 aa->aa_ppga[aa->aa_page_count-1]->count -
1587 CERROR("client %x, server %x, cksum_type %x\n",
1588 client_cksum, server_cksum, cksum_type);
1590 aa->aa_oa->o_cksum = client_cksum;
1594 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1597 } else if (unlikely(client_cksum)) {
1598 static int cksum_missed;
1601 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1602 CERROR("Checksum %u requested from %s but not sent\n",
1603 cksum_missed, libcfs_nid2str(peer->nid));
1609 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1614 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1615 struct lov_stripe_md *lsm,
1616 obd_count page_count, struct brw_page **pga,
1617 struct obd_capa *ocapa)
1619 struct ptlrpc_request *req;
1623 struct l_wait_info lwi;
1627 cfs_waitq_init(&waitq);
1630 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1631 page_count, pga, &req, ocapa, 0);
1635 rc = ptlrpc_queue_wait(req);
1637 if (rc == -ETIMEDOUT && req->rq_resend) {
1638 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1639 ptlrpc_req_finished(req);
1643 rc = osc_brw_fini_request(req, rc);
1645 ptlrpc_req_finished(req);
1646 if (osc_recoverable_error(rc)) {
1648 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1649 CERROR("too many resend retries, returning error\n");
1653 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1654 l_wait_event(waitq, 0, &lwi);
1662 int osc_brw_redo_request(struct ptlrpc_request *request,
1663 struct osc_brw_async_args *aa)
1665 struct ptlrpc_request *new_req;
1666 struct ptlrpc_request_set *set = request->rq_set;
1667 struct osc_brw_async_args *new_aa;
1668 struct osc_async_page *oap;
1672 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1673 CERROR("too many resend retries, returning error\n");
1677 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1679 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1680 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1681 aa->aa_cli, aa->aa_oa,
1682 NULL /* lsm unused by osc currently */,
1683 aa->aa_page_count, aa->aa_ppga,
1684 &new_req, aa->aa_ocapa, 0);
1688 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1690 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1691 if (oap->oap_request != NULL) {
1692 LASSERTF(request == oap->oap_request,
1693 "request %p != oap_request %p\n",
1694 request, oap->oap_request);
1695 if (oap->oap_interrupted) {
1696 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1697 ptlrpc_req_finished(new_req);
1702 /* New request takes over pga and oaps from old request.
1703 * Note that copying a list_head doesn't work, need to move it... */
1705 new_req->rq_interpret_reply = request->rq_interpret_reply;
1706 new_req->rq_async_args = request->rq_async_args;
1707 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1709 new_aa = ptlrpc_req_async_args(new_req);
1711 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1712 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1713 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1715 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1716 if (oap->oap_request) {
1717 ptlrpc_req_finished(oap->oap_request);
1718 oap->oap_request = ptlrpc_request_addref(new_req);
1722 new_aa->aa_ocapa = aa->aa_ocapa;
1723 aa->aa_ocapa = NULL;
1725 /* use ptlrpc_set_add_req is safe because interpret functions work
1726 * in check_set context. only one way exist with access to request
1727 * from different thread got -EINTR - this way protected with
1728 * cl_loi_list_lock */
1729 ptlrpc_set_add_req(set, new_req);
1731 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1733 DEBUG_REQ(D_INFO, new_req, "new request");
1738 * ugh, we want disk allocation on the target to happen in offset order. we'll
1739 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1740 * fine for our small page arrays and doesn't require allocation. its an
1741 * insertion sort that swaps elements that are strides apart, shrinking the
1742 * stride down until its '1' and the array is sorted.
1744 static void sort_brw_pages(struct brw_page **array, int num)
1747 struct brw_page *tmp;
1751 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1756 for (i = stride ; i < num ; i++) {
1759 while (j >= stride && array[j - stride]->off > tmp->off) {
1760 array[j] = array[j - stride];
1765 } while (stride > 1);
1768 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1774 LASSERT (pages > 0);
1775 offset = pg[i]->off & ~CFS_PAGE_MASK;
1779 if (pages == 0) /* that's all */
1782 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1783 return count; /* doesn't end on page boundary */
1786 offset = pg[i]->off & ~CFS_PAGE_MASK;
1787 if (offset != 0) /* doesn't start on page boundary */
1794 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1796 struct brw_page **ppga;
1799 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1803 for (i = 0; i < count; i++)
1808 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1810 LASSERT(ppga != NULL);
1811 OBD_FREE(ppga, sizeof(*ppga) * count);
1814 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1815 obd_count page_count, struct brw_page *pga,
1816 struct obd_trans_info *oti)
1818 struct obdo *saved_oa = NULL;
1819 struct brw_page **ppga, **orig;
1820 struct obd_import *imp = class_exp2cliimp(exp);
1821 struct client_obd *cli;
1822 int rc, page_count_orig;
1825 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1826 cli = &imp->imp_obd->u.cli;
1828 if (cmd & OBD_BRW_CHECK) {
1829 /* The caller just wants to know if there's a chance that this
1830 * I/O can succeed */
1832 if (imp->imp_invalid)
1837 /* test_brw with a failed create can trip this, maybe others. */
1838 LASSERT(cli->cl_max_pages_per_rpc);
1842 orig = ppga = osc_build_ppga(pga, page_count);
1845 page_count_orig = page_count;
1847 sort_brw_pages(ppga, page_count);
1848 while (page_count) {
1849 obd_count pages_per_brw;
1851 if (page_count > cli->cl_max_pages_per_rpc)
1852 pages_per_brw = cli->cl_max_pages_per_rpc;
1854 pages_per_brw = page_count;
1856 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1858 if (saved_oa != NULL) {
1859 /* restore previously saved oa */
1860 *oinfo->oi_oa = *saved_oa;
1861 } else if (page_count > pages_per_brw) {
1862 /* save a copy of oa (brw will clobber it) */
1863 OBDO_ALLOC(saved_oa);
1864 if (saved_oa == NULL)
1865 GOTO(out, rc = -ENOMEM);
1866 *saved_oa = *oinfo->oi_oa;
1869 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1870 pages_per_brw, ppga, oinfo->oi_capa);
1875 page_count -= pages_per_brw;
1876 ppga += pages_per_brw;
1880 osc_release_ppga(orig, page_count_orig);
1882 if (saved_oa != NULL)
1883 OBDO_FREE(saved_oa);
1888 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1889 * the dirty accounting. Writeback completes or truncate happens before
1890 * writing starts. Must be called with the loi lock held. */
1891 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1894 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1898 /* This maintains the lists of pending pages to read/write for a given object
1899 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1900 * to quickly find objects that are ready to send an RPC. */
1901 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1907 if (lop->lop_num_pending == 0)
1910 /* if we have an invalid import we want to drain the queued pages
1911 * by forcing them through rpcs that immediately fail and complete
1912 * the pages. recovery relies on this to empty the queued pages
1913 * before canceling the locks and evicting down the llite pages */
1914 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1917 /* stream rpcs in queue order as long as as there is an urgent page
1918 * queued. this is our cheap solution for good batching in the case
1919 * where writepage marks some random page in the middle of the file
1920 * as urgent because of, say, memory pressure */
1921 if (!list_empty(&lop->lop_urgent)) {
1922 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1925 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1926 optimal = cli->cl_max_pages_per_rpc;
1927 if (cmd & OBD_BRW_WRITE) {
1928 /* trigger a write rpc stream as long as there are dirtiers
1929 * waiting for space. as they're waiting, they're not going to
1930 * create more pages to coallesce with what's waiting.. */
1931 if (!list_empty(&cli->cl_cache_waiters)) {
1932 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1935 /* +16 to avoid triggering rpcs that would want to include pages
1936 * that are being queued but which can't be made ready until
1937 * the queuer finishes with the page. this is a wart for
1938 * llite::commit_write() */
1941 if (lop->lop_num_pending >= optimal)
1947 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1949 struct osc_async_page *oap;
1952 if (list_empty(&lop->lop_urgent))
1955 oap = list_entry(lop->lop_urgent.next,
1956 struct osc_async_page, oap_urgent_item);
1958 if (oap->oap_async_flags & ASYNC_HP) {
1959 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1966 static void on_list(struct list_head *item, struct list_head *list,
1969 if (list_empty(item) && should_be_on)
1970 list_add_tail(item, list);
1971 else if (!list_empty(item) && !should_be_on)
1972 list_del_init(item);
1975 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1976 * can find pages to build into rpcs quickly */
1977 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1979 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1980 lop_makes_hprpc(&loi->loi_read_lop)) {
1982 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1983 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1985 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1986 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1987 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1988 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1991 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1992 loi->loi_write_lop.lop_num_pending);
1994 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1995 loi->loi_read_lop.lop_num_pending);
1998 static void lop_update_pending(struct client_obd *cli,
1999 struct loi_oap_pages *lop, int cmd, int delta)
2001 lop->lop_num_pending += delta;
2002 if (cmd & OBD_BRW_WRITE)
2003 cli->cl_pending_w_pages += delta;
2005 cli->cl_pending_r_pages += delta;
2009 * this is called when a sync waiter receives an interruption. Its job is to
2010 * get the caller woken as soon as possible. If its page hasn't been put in an
2011 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2012 * desiring interruption which will forcefully complete the rpc once the rpc
2015 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2017 struct loi_oap_pages *lop;
2018 struct lov_oinfo *loi;
2022 LASSERT(!oap->oap_interrupted);
2023 oap->oap_interrupted = 1;
2025 /* ok, it's been put in an rpc. only one oap gets a request reference */
2026 if (oap->oap_request != NULL) {
2027 ptlrpc_mark_interrupted(oap->oap_request);
2028 ptlrpcd_wake(oap->oap_request);
2029 ptlrpc_req_finished(oap->oap_request);
2030 oap->oap_request = NULL;
2034 * page completion may be called only if ->cpo_prep() method was
2035 * executed by osc_io_submit(), that also adds page the to pending list
2037 if (!list_empty(&oap->oap_pending_item)) {
2038 list_del_init(&oap->oap_pending_item);
2039 list_del_init(&oap->oap_urgent_item);
2042 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2043 &loi->loi_write_lop : &loi->loi_read_lop;
2044 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2045 loi_list_maint(oap->oap_cli, oap->oap_loi);
2046 rc = oap->oap_caller_ops->ap_completion(env,
2047 oap->oap_caller_data,
2048 oap->oap_cmd, NULL, -EINTR);
2054 /* this is trying to propogate async writeback errors back up to the
2055 * application. As an async write fails we record the error code for later if
2056 * the app does an fsync. As long as errors persist we force future rpcs to be
2057 * sync so that the app can get a sync error and break the cycle of queueing
2058 * pages for which writeback will fail. */
2059 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2066 ar->ar_force_sync = 1;
2067 ar->ar_min_xid = ptlrpc_sample_next_xid();
2072 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2073 ar->ar_force_sync = 0;
2076 void osc_oap_to_pending(struct osc_async_page *oap)
2078 struct loi_oap_pages *lop;
2080 if (oap->oap_cmd & OBD_BRW_WRITE)
2081 lop = &oap->oap_loi->loi_write_lop;
2083 lop = &oap->oap_loi->loi_read_lop;
2085 if (oap->oap_async_flags & ASYNC_HP)
2086 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2087 else if (oap->oap_async_flags & ASYNC_URGENT)
2088 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2089 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2090 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2093 /* this must be called holding the loi list lock to give coverage to exit_cache,
2094 * async_flag maintenance, and oap_request */
2095 static void osc_ap_completion(const struct lu_env *env,
2096 struct client_obd *cli, struct obdo *oa,
2097 struct osc_async_page *oap, int sent, int rc)
2102 if (oap->oap_request != NULL) {
2103 xid = ptlrpc_req_xid(oap->oap_request);
2104 ptlrpc_req_finished(oap->oap_request);
2105 oap->oap_request = NULL;
2108 spin_lock(&oap->oap_lock);
2109 oap->oap_async_flags = 0;
2110 spin_unlock(&oap->oap_lock);
2111 oap->oap_interrupted = 0;
2113 if (oap->oap_cmd & OBD_BRW_WRITE) {
2114 osc_process_ar(&cli->cl_ar, xid, rc);
2115 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2118 if (rc == 0 && oa != NULL) {
2119 if (oa->o_valid & OBD_MD_FLBLOCKS)
2120 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2121 if (oa->o_valid & OBD_MD_FLMTIME)
2122 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2123 if (oa->o_valid & OBD_MD_FLATIME)
2124 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2125 if (oa->o_valid & OBD_MD_FLCTIME)
2126 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2129 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2130 oap->oap_cmd, oa, rc);
2132 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2133 * I/O on the page could start, but OSC calls it under lock
2134 * and thus we can add oap back to pending safely */
2136 /* upper layer wants to leave the page on pending queue */
2137 osc_oap_to_pending(oap);
2139 osc_exit_cache(cli, oap, sent);
2143 static int brw_interpret(const struct lu_env *env,
2144 struct ptlrpc_request *req, void *data, int rc)
2146 struct osc_brw_async_args *aa = data;
2147 struct client_obd *cli;
2151 rc = osc_brw_fini_request(req, rc);
2152 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2153 if (osc_recoverable_error(rc)) {
2154 rc = osc_brw_redo_request(req, aa);
2160 capa_put(aa->aa_ocapa);
2161 aa->aa_ocapa = NULL;
2166 client_obd_list_lock(&cli->cl_loi_list_lock);
2168 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2169 * is called so we know whether to go to sync BRWs or wait for more
2170 * RPCs to complete */
2171 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2172 cli->cl_w_in_flight--;
2174 cli->cl_r_in_flight--;
2176 async = list_empty(&aa->aa_oaps);
2177 if (!async) { /* from osc_send_oap_rpc() */
2178 struct osc_async_page *oap, *tmp;
2179 /* the caller may re-use the oap after the completion call so
2180 * we need to clean it up a little */
2181 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2182 list_del_init(&oap->oap_rpc_item);
2183 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2185 OBDO_FREE(aa->aa_oa);
2186 } else { /* from async_internal() */
2188 for (i = 0; i < aa->aa_page_count; i++)
2189 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2191 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2192 OBDO_FREE(aa->aa_oa);
2194 osc_wake_cache_waiters(cli);
2195 osc_check_rpcs(env, cli);
2196 client_obd_list_unlock(&cli->cl_loi_list_lock);
2198 cl_req_completion(env, aa->aa_clerq, rc);
2199 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2203 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2204 struct client_obd *cli,
2205 struct list_head *rpc_list,
2206 int page_count, int cmd)
2208 struct ptlrpc_request *req;
2209 struct brw_page **pga = NULL;
2210 struct osc_brw_async_args *aa;
2211 struct obdo *oa = NULL;
2212 const struct obd_async_page_ops *ops = NULL;
2213 void *caller_data = NULL;
2214 struct osc_async_page *oap;
2215 struct osc_async_page *tmp;
2216 struct ost_body *body;
2217 struct cl_req *clerq = NULL;
2218 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2219 struct ldlm_lock *lock = NULL;
2220 struct cl_req_attr crattr;
2224 LASSERT(!list_empty(rpc_list));
2226 memset(&crattr, 0, sizeof crattr);
2227 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2229 GOTO(out, req = ERR_PTR(-ENOMEM));
2233 GOTO(out, req = ERR_PTR(-ENOMEM));
2236 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2237 struct cl_page *page = osc_oap2cl_page(oap);
2239 ops = oap->oap_caller_ops;
2240 caller_data = oap->oap_caller_data;
2242 clerq = cl_req_alloc(env, page, crt,
2243 1 /* only 1-object rpcs for
2246 GOTO(out, req = (void *)clerq);
2247 lock = oap->oap_ldlm_lock;
2249 pga[i] = &oap->oap_brw_page;
2250 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2251 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2252 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2254 cl_req_page_add(env, clerq, page);
2257 /* always get the data for the obdo for the rpc */
2258 LASSERT(ops != NULL);
2260 crattr.cra_capa = NULL;
2261 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2263 oa->o_handle = lock->l_remote_handle;
2264 oa->o_valid |= OBD_MD_FLHANDLE;
2267 rc = cl_req_prep(env, clerq);
2269 CERROR("cl_req_prep failed: %d\n", rc);
2270 GOTO(out, req = ERR_PTR(rc));
2273 sort_brw_pages(pga, page_count);
2274 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2275 pga, &req, crattr.cra_capa, 1);
2277 CERROR("prep_req failed: %d\n", rc);
2278 GOTO(out, req = ERR_PTR(rc));
2281 /* Need to update the timestamps after the request is built in case
2282 * we race with setattr (locally or in queue at OST). If OST gets
2283 * later setattr before earlier BRW (as determined by the request xid),
2284 * the OST will not use BRW timestamps. Sadly, there is no obvious
2285 * way to do this in a single call. bug 10150 */
2286 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2287 cl_req_attr_set(env, clerq, &crattr,
2288 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2290 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2291 aa = ptlrpc_req_async_args(req);
2292 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2293 list_splice(rpc_list, &aa->aa_oaps);
2294 CFS_INIT_LIST_HEAD(rpc_list);
2295 aa->aa_clerq = clerq;
2297 capa_put(crattr.cra_capa);
2302 OBD_FREE(pga, sizeof(*pga) * page_count);
2303 /* this should happen rarely and is pretty bad, it makes the
2304 * pending list not follow the dirty order */
2305 client_obd_list_lock(&cli->cl_loi_list_lock);
2306 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2307 list_del_init(&oap->oap_rpc_item);
2309 /* queued sync pages can be torn down while the pages
2310 * were between the pending list and the rpc */
2311 if (oap->oap_interrupted) {
2312 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2313 osc_ap_completion(env, cli, NULL, oap, 0,
2317 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2319 if (clerq && !IS_ERR(clerq))
2320 cl_req_completion(env, clerq, PTR_ERR(req));
2326 * prepare pages for ASYNC io and put pages in send queue.
2330 * \param cmd - OBD_BRW_* macroses
2331 * \param lop - pending pages
2333 * \return zero if pages successfully add to send queue.
2334 * \return not zere if error occurring.
2337 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2338 struct lov_oinfo *loi,
2339 int cmd, struct loi_oap_pages *lop)
2341 struct ptlrpc_request *req;
2342 obd_count page_count = 0;
2343 struct osc_async_page *oap = NULL, *tmp;
2344 struct osc_brw_async_args *aa;
2345 const struct obd_async_page_ops *ops;
2346 CFS_LIST_HEAD(rpc_list);
2347 CFS_LIST_HEAD(tmp_list);
2348 unsigned int ending_offset;
2349 unsigned starting_offset = 0;
2351 struct cl_object *clob = NULL;
2354 /* ASYNC_HP pages first. At present, when the lock the pages is
2355 * to be canceled, the pages covered by the lock will be sent out
2356 * with ASYNC_HP. We have to send out them as soon as possible. */
2357 list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2358 if (oap->oap_async_flags & ASYNC_HP)
2359 list_move(&oap->oap_pending_item, &tmp_list);
2361 list_move_tail(&oap->oap_pending_item, &tmp_list);
2362 if (++page_count >= cli->cl_max_pages_per_rpc)
2366 list_splice(&tmp_list, &lop->lop_pending);
2369 /* first we find the pages we're allowed to work with */
2370 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2372 ops = oap->oap_caller_ops;
2374 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2375 "magic 0x%x\n", oap, oap->oap_magic);
2378 /* pin object in memory, so that completion call-backs
2379 * can be safely called under client_obd_list lock. */
2380 clob = osc_oap2cl_page(oap)->cp_obj;
2381 cl_object_get(clob);
2384 if (page_count != 0 &&
2385 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2386 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2387 " oap %p, page %p, srvlock %u\n",
2388 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2392 /* If there is a gap at the start of this page, it can't merge
2393 * with any previous page, so we'll hand the network a
2394 * "fragmented" page array that it can't transfer in 1 RDMA */
2395 if (page_count != 0 && oap->oap_page_off != 0)
2398 /* in llite being 'ready' equates to the page being locked
2399 * until completion unlocks it. commit_write submits a page
2400 * as not ready because its unlock will happen unconditionally
2401 * as the call returns. if we race with commit_write giving
2402 * us that page we dont' want to create a hole in the page
2403 * stream, so we stop and leave the rpc to be fired by
2404 * another dirtier or kupdated interval (the not ready page
2405 * will still be on the dirty list). we could call in
2406 * at the end of ll_file_write to process the queue again. */
2407 if (!(oap->oap_async_flags & ASYNC_READY)) {
2408 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2411 CDEBUG(D_INODE, "oap %p page %p returned %d "
2412 "instead of ready\n", oap,
2416 /* llite is telling us that the page is still
2417 * in commit_write and that we should try
2418 * and put it in an rpc again later. we
2419 * break out of the loop so we don't create
2420 * a hole in the sequence of pages in the rpc
2425 /* the io isn't needed.. tell the checks
2426 * below to complete the rpc with EINTR */
2427 spin_lock(&oap->oap_lock);
2428 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2429 spin_unlock(&oap->oap_lock);
2430 oap->oap_count = -EINTR;
2433 spin_lock(&oap->oap_lock);
2434 oap->oap_async_flags |= ASYNC_READY;
2435 spin_unlock(&oap->oap_lock);
2438 LASSERTF(0, "oap %p page %p returned %d "
2439 "from make_ready\n", oap,
2447 * Page submitted for IO has to be locked. Either by
2448 * ->ap_make_ready() or by higher layers.
2450 #if defined(__KERNEL__) && defined(__linux__)
2452 struct cl_page *page;
2454 page = osc_oap2cl_page(oap);
2456 if (page->cp_type == CPT_CACHEABLE &&
2457 !(PageLocked(oap->oap_page) &&
2458 (CheckWriteback(oap->oap_page, cmd)))) {
2459 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2461 (long)oap->oap_page->flags,
2462 oap->oap_async_flags);
2468 /* take the page out of our book-keeping */
2469 list_del_init(&oap->oap_pending_item);
2470 lop_update_pending(cli, lop, cmd, -1);
2471 list_del_init(&oap->oap_urgent_item);
2473 if (page_count == 0)
2474 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2475 (PTLRPC_MAX_BRW_SIZE - 1);
2477 /* ask the caller for the size of the io as the rpc leaves. */
2478 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2480 ops->ap_refresh_count(env, oap->oap_caller_data,
2482 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2484 if (oap->oap_count <= 0) {
2485 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2487 osc_ap_completion(env, cli, NULL,
2488 oap, 0, oap->oap_count);
2492 /* now put the page back in our accounting */
2493 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2494 if (page_count == 0)
2495 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2496 if (++page_count >= cli->cl_max_pages_per_rpc)
2499 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2500 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2501 * have the same alignment as the initial writes that allocated
2502 * extents on the server. */
2503 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2504 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2505 if (ending_offset == 0)
2508 /* If there is a gap at the end of this page, it can't merge
2509 * with any subsequent pages, so we'll hand the network a
2510 * "fragmented" page array that it can't transfer in 1 RDMA */
2511 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2515 osc_wake_cache_waiters(cli);
2517 loi_list_maint(cli, loi);
2519 client_obd_list_unlock(&cli->cl_loi_list_lock);
2522 cl_object_put(env, clob);
2524 if (page_count == 0) {
2525 client_obd_list_lock(&cli->cl_loi_list_lock);
2529 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2531 LASSERT(list_empty(&rpc_list));
2532 loi_list_maint(cli, loi);
2533 RETURN(PTR_ERR(req));
2536 aa = ptlrpc_req_async_args(req);
2538 if (cmd == OBD_BRW_READ) {
2539 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2540 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2541 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2542 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2544 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2545 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2546 cli->cl_w_in_flight);
2547 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2548 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2550 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2552 client_obd_list_lock(&cli->cl_loi_list_lock);
2554 if (cmd == OBD_BRW_READ)
2555 cli->cl_r_in_flight++;
2557 cli->cl_w_in_flight++;
2559 /* queued sync pages can be torn down while the pages
2560 * were between the pending list and the rpc */
2562 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2563 /* only one oap gets a request reference */
2566 if (oap->oap_interrupted && !req->rq_intr) {
2567 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2569 ptlrpc_mark_interrupted(req);
2573 tmp->oap_request = ptlrpc_request_addref(req);
2575 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2576 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2578 req->rq_interpret_reply = brw_interpret;
2579 ptlrpcd_add_req(req, PSCOPE_BRW);
2583 #define LOI_DEBUG(LOI, STR, args...) \
2584 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2585 !list_empty(&(LOI)->loi_ready_item) || \
2586 !list_empty(&(LOI)->loi_hp_ready_item), \
2587 (LOI)->loi_write_lop.lop_num_pending, \
2588 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2589 (LOI)->loi_read_lop.lop_num_pending, \
2590 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2593 /* This is called by osc_check_rpcs() to find which objects have pages that
2594 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2595 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2599 /* First return objects that have blocked locks so that they
2600 * will be flushed quickly and other clients can get the lock,
2601 * then objects which have pages ready to be stuffed into RPCs */
2602 if (!list_empty(&cli->cl_loi_hp_ready_list))
2603 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2604 struct lov_oinfo, loi_hp_ready_item));
2605 if (!list_empty(&cli->cl_loi_ready_list))
2606 RETURN(list_entry(cli->cl_loi_ready_list.next,
2607 struct lov_oinfo, loi_ready_item));
2609 /* then if we have cache waiters, return all objects with queued
2610 * writes. This is especially important when many small files
2611 * have filled up the cache and not been fired into rpcs because
2612 * they don't pass the nr_pending/object threshhold */
2613 if (!list_empty(&cli->cl_cache_waiters) &&
2614 !list_empty(&cli->cl_loi_write_list))
2615 RETURN(list_entry(cli->cl_loi_write_list.next,
2616 struct lov_oinfo, loi_write_item));
2618 /* then return all queued objects when we have an invalid import
2619 * so that they get flushed */
2620 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2621 if (!list_empty(&cli->cl_loi_write_list))
2622 RETURN(list_entry(cli->cl_loi_write_list.next,
2623 struct lov_oinfo, loi_write_item));
2624 if (!list_empty(&cli->cl_loi_read_list))
2625 RETURN(list_entry(cli->cl_loi_read_list.next,
2626 struct lov_oinfo, loi_read_item));
2631 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2633 struct osc_async_page *oap;
2636 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2637 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2638 struct osc_async_page, oap_urgent_item);
2639 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2642 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2643 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2644 struct osc_async_page, oap_urgent_item);
2645 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2648 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2651 /* called with the loi list lock held */
2652 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2654 struct lov_oinfo *loi;
2655 int rc = 0, race_counter = 0;
2658 while ((loi = osc_next_loi(cli)) != NULL) {
2659 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2661 if (osc_max_rpc_in_flight(cli, loi))
2664 /* attempt some read/write balancing by alternating between
2665 * reads and writes in an object. The makes_rpc checks here
2666 * would be redundant if we were getting read/write work items
2667 * instead of objects. we don't want send_oap_rpc to drain a
2668 * partial read pending queue when we're given this object to
2669 * do io on writes while there are cache waiters */
2670 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2671 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2672 &loi->loi_write_lop);
2674 CERROR("Write request failed with %d\n", rc);
2676 /* osc_send_oap_rpc failed, mostly because of
2679 * It can't break here, because if:
2680 * - a page was submitted by osc_io_submit, so
2682 * - no request in flight
2683 * - no subsequent request
2684 * The system will be in live-lock state,
2685 * because there is no chance to call
2686 * osc_io_unplug() and osc_check_rpcs() any
2687 * more. pdflush can't help in this case,
2688 * because it might be blocked at grabbing
2689 * the page lock as we mentioned.
2691 * Anyway, continue to drain pages. */
2700 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2701 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2702 &loi->loi_read_lop);
2704 CERROR("Read request failed with %d\n", rc);
2712 /* attempt some inter-object balancing by issueing rpcs
2713 * for each object in turn */
2714 if (!list_empty(&loi->loi_hp_ready_item))
2715 list_del_init(&loi->loi_hp_ready_item);
2716 if (!list_empty(&loi->loi_ready_item))
2717 list_del_init(&loi->loi_ready_item);
2718 if (!list_empty(&loi->loi_write_item))
2719 list_del_init(&loi->loi_write_item);
2720 if (!list_empty(&loi->loi_read_item))
2721 list_del_init(&loi->loi_read_item);
2723 loi_list_maint(cli, loi);
2725 /* send_oap_rpc fails with 0 when make_ready tells it to
2726 * back off. llite's make_ready does this when it tries
2727 * to lock a page queued for write that is already locked.
2728 * we want to try sending rpcs from many objects, but we
2729 * don't want to spin failing with 0. */
2730 if (race_counter == 10)
2736 /* we're trying to queue a page in the osc so we're subject to the
2737 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2738 * If the osc's queued pages are already at that limit, then we want to sleep
2739 * until there is space in the osc's queue for us. We also may be waiting for
2740 * write credits from the OST if there are RPCs in flight that may return some
2741 * before we fall back to sync writes.
2743 * We need this know our allocation was granted in the presence of signals */
2744 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2748 client_obd_list_lock(&cli->cl_loi_list_lock);
2749 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2750 client_obd_list_unlock(&cli->cl_loi_list_lock);
2755 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2758 int osc_enter_cache_try(const struct lu_env *env,
2759 struct client_obd *cli, struct lov_oinfo *loi,
2760 struct osc_async_page *oap, int transient)
2764 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2766 osc_consume_write_grant(cli, &oap->oap_brw_page);
2768 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2769 atomic_inc(&obd_dirty_transit_pages);
2770 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2776 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2777 * grant or cache space. */
2778 static int osc_enter_cache(const struct lu_env *env,
2779 struct client_obd *cli, struct lov_oinfo *loi,
2780 struct osc_async_page *oap)
2782 struct osc_cache_waiter ocw;
2783 struct l_wait_info lwi = { 0 };
2787 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2788 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2789 cli->cl_dirty_max, obd_max_dirty_pages,
2790 cli->cl_lost_grant, cli->cl_avail_grant);
2792 /* force the caller to try sync io. this can jump the list
2793 * of queued writes and create a discontiguous rpc stream */
2794 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2795 loi->loi_ar.ar_force_sync)
2798 /* Hopefully normal case - cache space and write credits available */
2799 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2800 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2801 osc_enter_cache_try(env, cli, loi, oap, 0))
2804 /* Make sure that there are write rpcs in flight to wait for. This
2805 * is a little silly as this object may not have any pending but
2806 * other objects sure might. */
2807 if (cli->cl_w_in_flight) {
2808 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2809 cfs_waitq_init(&ocw.ocw_waitq);
2813 loi_list_maint(cli, loi);
2814 osc_check_rpcs(env, cli);
2815 client_obd_list_unlock(&cli->cl_loi_list_lock);
2817 CDEBUG(D_CACHE, "sleeping for cache space\n");
2818 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2820 client_obd_list_lock(&cli->cl_loi_list_lock);
2821 if (!list_empty(&ocw.ocw_entry)) {
2822 list_del(&ocw.ocw_entry);
2832 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2833 struct lov_oinfo *loi, cfs_page_t *page,
2834 obd_off offset, const struct obd_async_page_ops *ops,
2835 void *data, void **res, int nocache,
2836 struct lustre_handle *lockh)
2838 struct osc_async_page *oap;
2843 return size_round(sizeof(*oap));
2846 oap->oap_magic = OAP_MAGIC;
2847 oap->oap_cli = &exp->exp_obd->u.cli;
2850 oap->oap_caller_ops = ops;
2851 oap->oap_caller_data = data;
2853 oap->oap_page = page;
2854 oap->oap_obj_off = offset;
2855 if (!client_is_remote(exp) &&
2856 cfs_capable(CFS_CAP_SYS_RESOURCE))
2857 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2859 LASSERT(!(offset & ~CFS_PAGE_MASK));
2861 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2862 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2863 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2864 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2866 spin_lock_init(&oap->oap_lock);
2867 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2871 struct osc_async_page *oap_from_cookie(void *cookie)
2873 struct osc_async_page *oap = cookie;
2874 if (oap->oap_magic != OAP_MAGIC)
2875 return ERR_PTR(-EINVAL);
2879 int osc_queue_async_io(const struct lu_env *env,
2880 struct obd_export *exp, struct lov_stripe_md *lsm,
2881 struct lov_oinfo *loi, void *cookie,
2882 int cmd, obd_off off, int count,
2883 obd_flag brw_flags, enum async_flags async_flags)
2885 struct client_obd *cli = &exp->exp_obd->u.cli;
2886 struct osc_async_page *oap;
2890 oap = oap_from_cookie(cookie);
2892 RETURN(PTR_ERR(oap));
2894 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2897 if (!list_empty(&oap->oap_pending_item) ||
2898 !list_empty(&oap->oap_urgent_item) ||
2899 !list_empty(&oap->oap_rpc_item))
2902 /* check if the file's owner/group is over quota */
2903 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2904 struct cl_object *obj;
2905 struct cl_attr attr; /* XXX put attr into thread info */
2906 unsigned int qid[MAXQUOTAS];
2908 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2910 cl_object_attr_lock(obj);
2911 rc = cl_object_attr_get(env, obj, &attr);
2912 cl_object_attr_unlock(obj);
2914 qid[USRQUOTA] = attr.cat_uid;
2915 qid[GRPQUOTA] = attr.cat_gid;
2917 lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2924 loi = lsm->lsm_oinfo[0];
2926 client_obd_list_lock(&cli->cl_loi_list_lock);
2928 LASSERT(off + count <= CFS_PAGE_SIZE);
2930 oap->oap_page_off = off;
2931 oap->oap_count = count;
2932 oap->oap_brw_flags = brw_flags;
2933 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2934 if (libcfs_memory_pressure_get())
2935 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2936 spin_lock(&oap->oap_lock);
2937 oap->oap_async_flags = async_flags;
2938 spin_unlock(&oap->oap_lock);
2940 if (cmd & OBD_BRW_WRITE) {
2941 rc = osc_enter_cache(env, cli, loi, oap);
2943 client_obd_list_unlock(&cli->cl_loi_list_lock);
2948 osc_oap_to_pending(oap);
2949 loi_list_maint(cli, loi);
2951 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2954 osc_check_rpcs(env, cli);
2955 client_obd_list_unlock(&cli->cl_loi_list_lock);
2960 /* aka (~was & now & flag), but this is more clear :) */
2961 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2963 int osc_set_async_flags_base(struct client_obd *cli,
2964 struct lov_oinfo *loi, struct osc_async_page *oap,
2965 obd_flag async_flags)
2967 struct loi_oap_pages *lop;
2971 LASSERT(!list_empty(&oap->oap_pending_item));
2973 if (oap->oap_cmd & OBD_BRW_WRITE) {
2974 lop = &loi->loi_write_lop;
2976 lop = &loi->loi_read_lop;
2979 if ((oap->oap_async_flags & async_flags) == async_flags)
2982 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2983 flags |= ASYNC_READY;
2985 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2986 list_empty(&oap->oap_rpc_item)) {
2987 if (oap->oap_async_flags & ASYNC_HP)
2988 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2990 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2991 flags |= ASYNC_URGENT;
2992 loi_list_maint(cli, loi);
2994 spin_lock(&oap->oap_lock);
2995 oap->oap_async_flags |= flags;
2996 spin_unlock(&oap->oap_lock);
2998 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2999 oap->oap_async_flags);
3003 int osc_teardown_async_page(struct obd_export *exp,
3004 struct lov_stripe_md *lsm,
3005 struct lov_oinfo *loi, void *cookie)
3007 struct client_obd *cli = &exp->exp_obd->u.cli;
3008 struct loi_oap_pages *lop;
3009 struct osc_async_page *oap;
3013 oap = oap_from_cookie(cookie);
3015 RETURN(PTR_ERR(oap));
3018 loi = lsm->lsm_oinfo[0];
3020 if (oap->oap_cmd & OBD_BRW_WRITE) {
3021 lop = &loi->loi_write_lop;
3023 lop = &loi->loi_read_lop;
3026 client_obd_list_lock(&cli->cl_loi_list_lock);
3028 if (!list_empty(&oap->oap_rpc_item))
3029 GOTO(out, rc = -EBUSY);
3031 osc_exit_cache(cli, oap, 0);
3032 osc_wake_cache_waiters(cli);
3034 if (!list_empty(&oap->oap_urgent_item)) {
3035 list_del_init(&oap->oap_urgent_item);
3036 spin_lock(&oap->oap_lock);
3037 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3038 spin_unlock(&oap->oap_lock);
3040 if (!list_empty(&oap->oap_pending_item)) {
3041 list_del_init(&oap->oap_pending_item);
3042 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3044 loi_list_maint(cli, loi);
3045 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3047 client_obd_list_unlock(&cli->cl_loi_list_lock);
3051 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3052 struct ldlm_enqueue_info *einfo,
3055 void *data = einfo->ei_cbdata;
3057 LASSERT(lock != NULL);
3058 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3059 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3060 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3061 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3063 lock_res_and_lock(lock);
3064 spin_lock(&osc_ast_guard);
3065 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3066 lock->l_ast_data = data;
3067 spin_unlock(&osc_ast_guard);
3068 unlock_res_and_lock(lock);
3071 static void osc_set_data_with_check(struct lustre_handle *lockh,
3072 struct ldlm_enqueue_info *einfo,
3075 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3078 osc_set_lock_data_with_check(lock, einfo, flags);
3079 LDLM_LOCK_PUT(lock);
3081 CERROR("lockh %p, data %p - client evicted?\n",
3082 lockh, einfo->ei_cbdata);
3085 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3086 ldlm_iterator_t replace, void *data)
3088 struct ldlm_res_id res_id;
3089 struct obd_device *obd = class_exp2obd(exp);
3091 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3092 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3096 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3097 obd_enqueue_update_f upcall, void *cookie,
3100 int intent = *flags & LDLM_FL_HAS_INTENT;
3104 /* The request was created before ldlm_cli_enqueue call. */
3105 if (rc == ELDLM_LOCK_ABORTED) {
3106 struct ldlm_reply *rep;
3107 rep = req_capsule_server_get(&req->rq_pill,
3110 LASSERT(rep != NULL);
3111 if (rep->lock_policy_res1)
3112 rc = rep->lock_policy_res1;
3116 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3117 *flags |= LDLM_FL_LVB_READY;
3118 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3119 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3122 /* Call the update callback. */
3123 rc = (*upcall)(cookie, rc);
3127 static int osc_enqueue_interpret(const struct lu_env *env,
3128 struct ptlrpc_request *req,
3129 struct osc_enqueue_args *aa, int rc)
3131 struct ldlm_lock *lock;
3132 struct lustre_handle handle;
3135 /* Make a local copy of a lock handle and a mode, because aa->oa_*
3136 * might be freed anytime after lock upcall has been called. */
3137 lustre_handle_copy(&handle, aa->oa_lockh);
3138 mode = aa->oa_ei->ei_mode;
3140 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3142 lock = ldlm_handle2lock(&handle);
3144 /* Take an additional reference so that a blocking AST that
3145 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3146 * to arrive after an upcall has been executed by
3147 * osc_enqueue_fini(). */
3148 ldlm_lock_addref(&handle, mode);
3150 /* Complete obtaining the lock procedure. */
3151 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3152 mode, aa->oa_flags, aa->oa_lvb,
3153 sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3155 /* Complete osc stuff. */
3156 rc = osc_enqueue_fini(req, aa->oa_lvb,
3157 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3159 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3161 /* Release the lock for async request. */
3162 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3164 * Releases a reference taken by ldlm_cli_enqueue(), if it is
3165 * not already released by
3166 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3168 ldlm_lock_decref(&handle, mode);
3170 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3171 aa->oa_lockh, req, aa);
3172 ldlm_lock_decref(&handle, mode);
3173 LDLM_LOCK_PUT(lock);
3177 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3178 struct lov_oinfo *loi, int flags,
3179 struct ost_lvb *lvb, __u32 mode, int rc)
3181 if (rc == ELDLM_OK) {
3182 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3185 LASSERT(lock != NULL);
3186 loi->loi_lvb = *lvb;
3187 tmp = loi->loi_lvb.lvb_size;
3188 /* Extend KMS up to the end of this lock and no further
3189 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3190 if (tmp > lock->l_policy_data.l_extent.end)
3191 tmp = lock->l_policy_data.l_extent.end + 1;
3192 if (tmp >= loi->loi_kms) {
3193 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3194 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3195 loi_kms_set(loi, tmp);
3197 LDLM_DEBUG(lock, "lock acquired, setting rss="
3198 LPU64"; leaving kms="LPU64", end="LPU64,
3199 loi->loi_lvb.lvb_size, loi->loi_kms,
3200 lock->l_policy_data.l_extent.end);
3202 ldlm_lock_allow_match(lock);
3203 LDLM_LOCK_PUT(lock);
3204 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3205 loi->loi_lvb = *lvb;
3206 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3207 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3211 EXPORT_SYMBOL(osc_update_enqueue);
3213 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3215 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3216 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3217 * other synchronous requests, however keeping some locks and trying to obtain
3218 * others may take a considerable amount of time in a case of ost failure; and
3219 * when other sync requests do not get released lock from a client, the client
3220 * is excluded from the cluster -- such scenarious make the life difficult, so
3221 * release locks just after they are obtained. */
3222 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3223 int *flags, ldlm_policy_data_t *policy,
3224 struct ost_lvb *lvb, int kms_valid,
3225 obd_enqueue_update_f upcall, void *cookie,
3226 struct ldlm_enqueue_info *einfo,
3227 struct lustre_handle *lockh,
3228 struct ptlrpc_request_set *rqset, int async)
3230 struct obd_device *obd = exp->exp_obd;
3231 struct ptlrpc_request *req = NULL;
3232 int intent = *flags & LDLM_FL_HAS_INTENT;
3237 /* Filesystem lock extents are extended to page boundaries so that
3238 * dealing with the page cache is a little smoother. */
3239 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3240 policy->l_extent.end |= ~CFS_PAGE_MASK;
3243 * kms is not valid when either object is completely fresh (so that no
3244 * locks are cached), or object was evicted. In the latter case cached
3245 * lock cannot be used, because it would prime inode state with
3246 * potentially stale LVB.
3251 /* Next, search for already existing extent locks that will cover us */
3252 /* If we're trying to read, we also search for an existing PW lock. The
3253 * VFS and page cache already protect us locally, so lots of readers/
3254 * writers can share a single PW lock.
3256 * There are problems with conversion deadlocks, so instead of
3257 * converting a read lock to a write lock, we'll just enqueue a new
3260 * At some point we should cancel the read lock instead of making them
3261 * send us a blocking callback, but there are problems with canceling
3262 * locks out from other users right now, too. */
3263 mode = einfo->ei_mode;
3264 if (einfo->ei_mode == LCK_PR)
3266 mode = ldlm_lock_match(obd->obd_namespace,
3267 *flags | LDLM_FL_LVB_READY, res_id,
3268 einfo->ei_type, policy, mode, lockh, 0);
3270 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3272 if (matched->l_ast_data == NULL ||
3273 matched->l_ast_data == einfo->ei_cbdata) {
3274 /* addref the lock only if not async requests and PW
3275 * lock is matched whereas we asked for PR. */
3276 if (!rqset && einfo->ei_mode != mode)
3277 ldlm_lock_addref(lockh, LCK_PR);
3278 osc_set_lock_data_with_check(matched, einfo, *flags);
3280 /* I would like to be able to ASSERT here that
3281 * rss <= kms, but I can't, for reasons which
3282 * are explained in lov_enqueue() */
3285 /* We already have a lock, and it's referenced */
3286 (*upcall)(cookie, ELDLM_OK);
3288 /* For async requests, decref the lock. */
3289 if (einfo->ei_mode != mode)
3290 ldlm_lock_decref(lockh, LCK_PW);
3292 ldlm_lock_decref(lockh, einfo->ei_mode);
3293 LDLM_LOCK_PUT(matched);
3296 ldlm_lock_decref(lockh, mode);
3297 LDLM_LOCK_PUT(matched);
3302 CFS_LIST_HEAD(cancels);
3303 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3304 &RQF_LDLM_ENQUEUE_LVB);
3308 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3312 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3314 ptlrpc_request_set_replen(req);
3317 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3318 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3320 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3321 sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3324 struct osc_enqueue_args *aa;
3325 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3326 aa = ptlrpc_req_async_args(req);
3329 aa->oa_flags = flags;
3330 aa->oa_upcall = upcall;
3331 aa->oa_cookie = cookie;
3333 aa->oa_lockh = lockh;
3335 req->rq_interpret_reply =
3336 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3337 if (rqset == PTLRPCD_SET)
3338 ptlrpcd_add_req(req, PSCOPE_OTHER);
3340 ptlrpc_set_add_req(rqset, req);
3341 } else if (intent) {
3342 ptlrpc_req_finished(req);
3347 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3349 ptlrpc_req_finished(req);
3354 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3355 struct ldlm_enqueue_info *einfo,
3356 struct ptlrpc_request_set *rqset)
3358 struct ldlm_res_id res_id;
3362 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3363 oinfo->oi_md->lsm_object_gr, &res_id);
3365 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3366 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3367 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3368 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3369 rqset, rqset != NULL);
3373 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3374 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3375 int *flags, void *data, struct lustre_handle *lockh,
3378 struct obd_device *obd = exp->exp_obd;
3379 int lflags = *flags;
3383 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3386 /* Filesystem lock extents are extended to page boundaries so that
3387 * dealing with the page cache is a little smoother */
3388 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3389 policy->l_extent.end |= ~CFS_PAGE_MASK;
3391 /* Next, search for already existing extent locks that will cover us */
3392 /* If we're trying to read, we also search for an existing PW lock. The
3393 * VFS and page cache already protect us locally, so lots of readers/
3394 * writers can share a single PW lock. */
3398 rc = ldlm_lock_match(obd->obd_namespace, lflags,
3399 res_id, type, policy, rc, lockh, unref);
3402 osc_set_data_with_check(lockh, data, lflags);
3403 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3404 ldlm_lock_addref(lockh, LCK_PR);
3405 ldlm_lock_decref(lockh, LCK_PW);
3412 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3416 if (unlikely(mode == LCK_GROUP))
3417 ldlm_lock_decref_and_cancel(lockh, mode);
3419 ldlm_lock_decref(lockh, mode);
3424 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3425 __u32 mode, struct lustre_handle *lockh)
3428 RETURN(osc_cancel_base(lockh, mode));
3431 static int osc_cancel_unused(struct obd_export *exp,
3432 struct lov_stripe_md *lsm, int flags,
3435 struct obd_device *obd = class_exp2obd(exp);
3436 struct ldlm_res_id res_id, *resp = NULL;
3439 resp = osc_build_res_name(lsm->lsm_object_id,
3440 lsm->lsm_object_gr, &res_id);
3443 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3446 static int osc_statfs_interpret(const struct lu_env *env,
3447 struct ptlrpc_request *req,
3448 struct osc_async_args *aa, int rc)
3450 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3451 struct obd_statfs *msfs;
3454 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3455 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3461 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3463 GOTO(out, rc = -EPROTO);
3466 /* Reinitialize the RDONLY and DEGRADED flags at the client
3467 * on each statfs, so they don't stay set permanently. */
3468 spin_lock(&cli->cl_oscc.oscc_lock);
3469 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_RDONLY | OSCC_FLAG_DEGRADED);
3470 if (msfs->os_state & OS_STATE_DEGRADED)
3471 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3473 if (msfs->os_state & OS_STATE_READONLY)
3474 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3475 spin_unlock(&cli->cl_oscc.oscc_lock);
3477 *aa->aa_oi->oi_osfs = *msfs;
3479 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3483 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3484 __u64 max_age, struct ptlrpc_request_set *rqset)
3486 struct ptlrpc_request *req;
3487 struct osc_async_args *aa;
3491 /* We could possibly pass max_age in the request (as an absolute
3492 * timestamp or a "seconds.usec ago") so the target can avoid doing
3493 * extra calls into the filesystem if that isn't necessary (e.g.
3494 * during mount that would help a bit). Having relative timestamps
3495 * is not so great if request processing is slow, while absolute
3496 * timestamps are not ideal because they need time synchronization. */
3497 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3501 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3503 ptlrpc_request_free(req);
3506 ptlrpc_request_set_replen(req);
3507 req->rq_request_portal = OST_CREATE_PORTAL;
3508 ptlrpc_at_set_req_timeout(req);
3510 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3511 /* procfs requests not want stat in wait for avoid deadlock */
3512 req->rq_no_resend = 1;
3513 req->rq_no_delay = 1;
3516 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3517 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3518 aa = ptlrpc_req_async_args(req);
3521 ptlrpc_set_add_req(rqset, req);
3525 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3526 __u64 max_age, __u32 flags)
3528 struct obd_statfs *msfs;
3529 struct ptlrpc_request *req;
3530 struct obd_import *imp = NULL;
3534 /*Since the request might also come from lprocfs, so we need
3535 *sync this with client_disconnect_export Bug15684*/
3536 down_read(&obd->u.cli.cl_sem);
3537 if (obd->u.cli.cl_import)
3538 imp = class_import_get(obd->u.cli.cl_import);
3539 up_read(&obd->u.cli.cl_sem);
3543 /* We could possibly pass max_age in the request (as an absolute
3544 * timestamp or a "seconds.usec ago") so the target can avoid doing
3545 * extra calls into the filesystem if that isn't necessary (e.g.
3546 * during mount that would help a bit). Having relative timestamps
3547 * is not so great if request processing is slow, while absolute
3548 * timestamps are not ideal because they need time synchronization. */
3549 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3551 class_import_put(imp);
3556 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3558 ptlrpc_request_free(req);
3561 ptlrpc_request_set_replen(req);
3562 req->rq_request_portal = OST_CREATE_PORTAL;
3563 ptlrpc_at_set_req_timeout(req);
3565 if (flags & OBD_STATFS_NODELAY) {
3566 /* procfs requests not want stat in wait for avoid deadlock */
3567 req->rq_no_resend = 1;
3568 req->rq_no_delay = 1;
3571 rc = ptlrpc_queue_wait(req);
3575 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3577 GOTO(out, rc = -EPROTO);
3584 ptlrpc_req_finished(req);
3588 /* Retrieve object striping information.
3590 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3591 * the maximum number of OST indices which will fit in the user buffer.
3592 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3594 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3596 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3597 struct lov_user_md_v3 lum, *lumk;
3598 struct lov_user_ost_data_v1 *lmm_objects;
3599 int rc = 0, lum_size;
3605 /* we only need the header part from user space to get lmm_magic and
3606 * lmm_stripe_count, (the header part is common to v1 and v3) */
3607 lum_size = sizeof(struct lov_user_md_v1);
3608 if (copy_from_user(&lum, lump, lum_size))
3611 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3612 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3615 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3616 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3617 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3618 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3620 /* we can use lov_mds_md_size() to compute lum_size
3621 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3622 if (lum.lmm_stripe_count > 0) {
3623 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3624 OBD_ALLOC(lumk, lum_size);
3628 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3629 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3631 lmm_objects = &(lumk->lmm_objects[0]);
3632 lmm_objects->l_object_id = lsm->lsm_object_id;
3634 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3638 lumk->lmm_object_id = lsm->lsm_object_id;
3639 lumk->lmm_object_gr = lsm->lsm_object_gr;
3640 lumk->lmm_stripe_count = 1;
3642 if (copy_to_user(lump, lumk, lum_size))
3646 OBD_FREE(lumk, lum_size);
3652 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3653 void *karg, void *uarg)
3655 struct obd_device *obd = exp->exp_obd;
3656 struct obd_ioctl_data *data = karg;
3660 if (!try_module_get(THIS_MODULE)) {
3661 CERROR("Can't get module. Is it alive?");
3665 case OBD_IOC_LOV_GET_CONFIG: {
3667 struct lov_desc *desc;
3668 struct obd_uuid uuid;
3672 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3673 GOTO(out, err = -EINVAL);
3675 data = (struct obd_ioctl_data *)buf;
3677 if (sizeof(*desc) > data->ioc_inllen1) {
3678 obd_ioctl_freedata(buf, len);
3679 GOTO(out, err = -EINVAL);
3682 if (data->ioc_inllen2 < sizeof(uuid)) {
3683 obd_ioctl_freedata(buf, len);
3684 GOTO(out, err = -EINVAL);
3687 desc = (struct lov_desc *)data->ioc_inlbuf1;
3688 desc->ld_tgt_count = 1;
3689 desc->ld_active_tgt_count = 1;
3690 desc->ld_default_stripe_count = 1;
3691 desc->ld_default_stripe_size = 0;
3692 desc->ld_default_stripe_offset = 0;
3693 desc->ld_pattern = 0;
3694 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3696 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3698 err = copy_to_user((void *)uarg, buf, len);
3701 obd_ioctl_freedata(buf, len);
3704 case LL_IOC_LOV_SETSTRIPE:
3705 err = obd_alloc_memmd(exp, karg);
3709 case LL_IOC_LOV_GETSTRIPE:
3710 err = osc_getstripe(karg, uarg);
3712 case OBD_IOC_CLIENT_RECOVER:
3713 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3718 case IOC_OSC_SET_ACTIVE:
3719 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3722 case OBD_IOC_POLL_QUOTACHECK:
3723 err = lquota_poll_check(quota_interface, exp,
3724 (struct if_quotacheck *)karg);
3726 case OBD_IOC_PING_TARGET:
3727 err = ptlrpc_obd_ping(obd);
3730 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3731 cmd, cfs_curproc_comm());
3732 GOTO(out, err = -ENOTTY);
3735 module_put(THIS_MODULE);
3739 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3740 void *key, __u32 *vallen, void *val,
3741 struct lov_stripe_md *lsm)
3744 if (!vallen || !val)
3747 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3748 __u32 *stripe = val;
3749 *vallen = sizeof(*stripe);
3752 } else if (KEY_IS(KEY_LAST_ID)) {
3753 struct ptlrpc_request *req;
3758 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3759 &RQF_OST_GET_INFO_LAST_ID);
3763 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3764 RCL_CLIENT, keylen);
3765 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3767 ptlrpc_request_free(req);
3771 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3772 memcpy(tmp, key, keylen);
3774 req->rq_no_delay = req->rq_no_resend = 1;
3775 ptlrpc_request_set_replen(req);
3776 rc = ptlrpc_queue_wait(req);
3780 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3782 GOTO(out, rc = -EPROTO);
3784 *((obd_id *)val) = *reply;
3786 ptlrpc_req_finished(req);
3788 } else if (KEY_IS(KEY_FIEMAP)) {
3789 struct ptlrpc_request *req;
3790 struct ll_user_fiemap *reply;
3794 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3795 &RQF_OST_GET_INFO_FIEMAP);
3799 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3800 RCL_CLIENT, keylen);
3801 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3802 RCL_CLIENT, *vallen);
3803 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3804 RCL_SERVER, *vallen);
3806 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3808 ptlrpc_request_free(req);
3812 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3813 memcpy(tmp, key, keylen);
3814 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3815 memcpy(tmp, val, *vallen);
3817 ptlrpc_request_set_replen(req);
3818 rc = ptlrpc_queue_wait(req);
3822 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3824 GOTO(out1, rc = -EPROTO);
3826 memcpy(val, reply, *vallen);
3828 ptlrpc_req_finished(req);
3836 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3838 struct llog_ctxt *ctxt;
3842 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3844 rc = llog_initiator_connect(ctxt);
3845 llog_ctxt_put(ctxt);
3847 /* XXX return an error? skip setting below flags? */
3850 spin_lock(&imp->imp_lock);
3851 imp->imp_server_timeout = 1;
3852 imp->imp_pingable = 1;
3853 spin_unlock(&imp->imp_lock);
3854 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3859 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3860 struct ptlrpc_request *req,
3867 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3870 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3871 void *key, obd_count vallen, void *val,
3872 struct ptlrpc_request_set *set)
3874 struct ptlrpc_request *req;
3875 struct obd_device *obd = exp->exp_obd;
3876 struct obd_import *imp = class_exp2cliimp(exp);
3881 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3883 if (KEY_IS(KEY_NEXT_ID)) {
3884 if (vallen != sizeof(obd_id))
3888 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3889 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3890 exp->exp_obd->obd_name,
3891 obd->u.cli.cl_oscc.oscc_next_id);
3896 if (KEY_IS(KEY_UNLINKED)) {
3897 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3898 spin_lock(&oscc->oscc_lock);
3899 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3900 spin_unlock(&oscc->oscc_lock);
3904 if (KEY_IS(KEY_INIT_RECOV)) {
3905 if (vallen != sizeof(int))
3907 spin_lock(&imp->imp_lock);
3908 imp->imp_initial_recov = *(int *)val;
3909 spin_unlock(&imp->imp_lock);
3910 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3911 exp->exp_obd->obd_name,
3912 imp->imp_initial_recov);
3916 if (KEY_IS(KEY_CHECKSUM)) {
3917 if (vallen != sizeof(int))
3919 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3923 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3924 sptlrpc_conf_client_adapt(obd);
3928 if (KEY_IS(KEY_FLUSH_CTX)) {
3929 sptlrpc_import_flush_my_ctx(imp);
3933 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3936 /* We pass all other commands directly to OST. Since nobody calls osc
3937 methods directly and everybody is supposed to go through LOV, we
3938 assume lov checked invalid values for us.
3939 The only recognised values so far are evict_by_nid and mds_conn.
3940 Even if something bad goes through, we'd get a -EINVAL from OST
3943 if (KEY_IS(KEY_GRANT_SHRINK))
3944 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3946 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3951 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3952 RCL_CLIENT, keylen);
3953 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3954 RCL_CLIENT, vallen);
3955 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3957 ptlrpc_request_free(req);
3961 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3962 memcpy(tmp, key, keylen);
3963 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3964 memcpy(tmp, val, vallen);
3966 if (KEY_IS(KEY_MDS_CONN)) {
3967 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3969 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3970 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3971 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3972 req->rq_no_delay = req->rq_no_resend = 1;
3973 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3974 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3975 struct osc_grant_args *aa;
3978 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3979 aa = ptlrpc_req_async_args(req);
3982 ptlrpc_req_finished(req);
3985 *oa = ((struct ost_body *)val)->oa;
3987 req->rq_interpret_reply = osc_shrink_grant_interpret;
3990 ptlrpc_request_set_replen(req);
3991 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3992 LASSERT(set != NULL);
3993 ptlrpc_set_add_req(set, req);
3994 ptlrpc_check_set(NULL, set);
3996 ptlrpcd_add_req(req, PSCOPE_OTHER);
4002 static struct llog_operations osc_size_repl_logops = {
4003 lop_cancel: llog_obd_repl_cancel
4006 static struct llog_operations osc_mds_ost_orig_logops;
4008 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4009 struct obd_device *tgt, struct llog_catid *catid)
4014 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4015 &catid->lci_logid, &osc_mds_ost_orig_logops);
4017 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4021 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4022 NULL, &osc_size_repl_logops);
4024 struct llog_ctxt *ctxt =
4025 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4028 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4033 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4034 obd->obd_name, tgt->obd_name, catid, rc);
4035 CERROR("logid "LPX64":0x%x\n",
4036 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4041 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4042 struct obd_device *disk_obd, int *index)
4044 struct llog_catid catid;
4045 static char name[32] = CATLIST;
4049 LASSERT(olg == &obd->obd_olg);
4051 mutex_down(&olg->olg_cat_processing);
4052 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4054 CERROR("rc: %d\n", rc);
4058 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4059 obd->obd_name, *index, catid.lci_logid.lgl_oid,
4060 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4062 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4064 CERROR("rc: %d\n", rc);
4068 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4070 CERROR("rc: %d\n", rc);
4075 mutex_up(&olg->olg_cat_processing);
4080 static int osc_llog_finish(struct obd_device *obd, int count)
4082 struct llog_ctxt *ctxt;
4083 int rc = 0, rc2 = 0;
4086 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4088 rc = llog_cleanup(ctxt);
4090 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4092 rc2 = llog_cleanup(ctxt);
4099 static int osc_reconnect(const struct lu_env *env,
4100 struct obd_export *exp, struct obd_device *obd,
4101 struct obd_uuid *cluuid,
4102 struct obd_connect_data *data,
4105 struct client_obd *cli = &obd->u.cli;
4107 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4110 client_obd_list_lock(&cli->cl_loi_list_lock);
4111 data->ocd_grant = cli->cl_avail_grant ?:
4112 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4113 lost_grant = cli->cl_lost_grant;
4114 cli->cl_lost_grant = 0;
4115 client_obd_list_unlock(&cli->cl_loi_list_lock);
4117 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4118 "cl_lost_grant: %ld\n", data->ocd_grant,
4119 cli->cl_avail_grant, lost_grant);
4120 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4121 " ocd_grant: %d\n", data->ocd_connect_flags,
4122 data->ocd_version, data->ocd_grant);
4128 static int osc_disconnect(struct obd_export *exp)
4130 struct obd_device *obd = class_exp2obd(exp);
4131 struct llog_ctxt *ctxt;
4134 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4136 if (obd->u.cli.cl_conn_count == 1) {
4137 /* Flush any remaining cancel messages out to the
4139 llog_sync(ctxt, exp);
4141 llog_ctxt_put(ctxt);
4143 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4147 rc = client_disconnect_export(exp);
4149 * Initially we put del_shrink_grant before disconnect_export, but it
4150 * causes the following problem if setup (connect) and cleanup
4151 * (disconnect) are tangled together.
4152 * connect p1 disconnect p2
4153 * ptlrpc_connect_import
4154 * ............... class_manual_cleanup
4157 * ptlrpc_connect_interrupt
4159 * add this client to shrink list
4161 * Bang! pinger trigger the shrink.
4162 * So the osc should be disconnected from the shrink list, after we
4163 * are sure the import has been destroyed. BUG18662
4165 if (obd->u.cli.cl_import == NULL)
4166 osc_del_shrink_grant(&obd->u.cli);
4170 static int osc_import_event(struct obd_device *obd,
4171 struct obd_import *imp,
4172 enum obd_import_event event)
4174 struct client_obd *cli;
4178 LASSERT(imp->imp_obd == obd);
4181 case IMP_EVENT_DISCON: {
4182 /* Only do this on the MDS OSC's */
4183 if (imp->imp_server_timeout) {
4184 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4186 spin_lock(&oscc->oscc_lock);
4187 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4188 spin_unlock(&oscc->oscc_lock);
4191 client_obd_list_lock(&cli->cl_loi_list_lock);
4192 cli->cl_avail_grant = 0;
4193 cli->cl_lost_grant = 0;
4194 client_obd_list_unlock(&cli->cl_loi_list_lock);
4197 case IMP_EVENT_INACTIVE: {
4198 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4201 case IMP_EVENT_INVALIDATE: {
4202 struct ldlm_namespace *ns = obd->obd_namespace;
4206 env = cl_env_get(&refcheck);
4210 client_obd_list_lock(&cli->cl_loi_list_lock);
4211 /* all pages go to failing rpcs due to the invalid
4213 osc_check_rpcs(env, cli);
4214 client_obd_list_unlock(&cli->cl_loi_list_lock);
4216 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4217 cl_env_put(env, &refcheck);
4222 case IMP_EVENT_ACTIVE: {
4223 /* Only do this on the MDS OSC's */
4224 if (imp->imp_server_timeout) {
4225 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4227 spin_lock(&oscc->oscc_lock);
4228 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4229 spin_unlock(&oscc->oscc_lock);
4231 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4234 case IMP_EVENT_OCD: {
4235 struct obd_connect_data *ocd = &imp->imp_connect_data;
4237 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4238 osc_init_grant(&obd->u.cli, ocd);
4241 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4242 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4244 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4248 CERROR("Unknown import event %d\n", event);
4254 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4260 rc = ptlrpcd_addref();
4264 rc = client_obd_setup(obd, lcfg);
4268 struct lprocfs_static_vars lvars = { 0 };
4269 struct client_obd *cli = &obd->u.cli;
4271 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4272 lprocfs_osc_init_vars(&lvars);
4273 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4274 lproc_osc_attach_seqstat(obd);
4275 sptlrpc_lprocfs_cliobd_attach(obd);
4276 ptlrpc_lprocfs_register_obd(obd);
4280 /* We need to allocate a few requests more, because
4281 brw_interpret tries to create new requests before freeing
4282 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4283 reserved, but I afraid that might be too much wasted RAM
4284 in fact, so 2 is just my guess and still should work. */
4285 cli->cl_import->imp_rq_pool =
4286 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4288 ptlrpc_add_rqs_to_pool);
4290 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4291 sema_init(&cli->cl_grant_sem, 1);
4297 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4303 case OBD_CLEANUP_EARLY: {
4304 struct obd_import *imp;
4305 imp = obd->u.cli.cl_import;
4306 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4307 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4308 ptlrpc_deactivate_import(imp);
4309 spin_lock(&imp->imp_lock);
4310 imp->imp_pingable = 0;
4311 spin_unlock(&imp->imp_lock);
4314 case OBD_CLEANUP_EXPORTS: {
4315 /* If we set up but never connected, the
4316 client import will not have been cleaned. */
4317 if (obd->u.cli.cl_import) {
4318 struct obd_import *imp;
4319 down_write(&obd->u.cli.cl_sem);
4320 imp = obd->u.cli.cl_import;
4321 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4323 ptlrpc_invalidate_import(imp);
4324 if (imp->imp_rq_pool) {
4325 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4326 imp->imp_rq_pool = NULL;
4328 class_destroy_import(imp);
4329 up_write(&obd->u.cli.cl_sem);
4330 obd->u.cli.cl_import = NULL;
4332 rc = obd_llog_finish(obd, 0);
4334 CERROR("failed to cleanup llogging subsystems\n");
4341 int osc_cleanup(struct obd_device *obd)
4346 ptlrpc_lprocfs_unregister_obd(obd);
4347 lprocfs_obd_cleanup(obd);
4349 /* free memory of osc quota cache */
4350 lquota_cleanup(quota_interface, obd);
4352 rc = client_obd_cleanup(obd);
4358 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4360 struct lprocfs_static_vars lvars = { 0 };
4363 lprocfs_osc_init_vars(&lvars);
4365 switch (lcfg->lcfg_command) {
4367 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4377 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4379 return osc_process_config_base(obd, buf);
4382 struct obd_ops osc_obd_ops = {
4383 .o_owner = THIS_MODULE,
4384 .o_setup = osc_setup,
4385 .o_precleanup = osc_precleanup,
4386 .o_cleanup = osc_cleanup,
4387 .o_add_conn = client_import_add_conn,
4388 .o_del_conn = client_import_del_conn,
4389 .o_connect = client_connect_import,
4390 .o_reconnect = osc_reconnect,
4391 .o_disconnect = osc_disconnect,
4392 .o_statfs = osc_statfs,
4393 .o_statfs_async = osc_statfs_async,
4394 .o_packmd = osc_packmd,
4395 .o_unpackmd = osc_unpackmd,
4396 .o_precreate = osc_precreate,
4397 .o_create = osc_create,
4398 .o_create_async = osc_create_async,
4399 .o_destroy = osc_destroy,
4400 .o_getattr = osc_getattr,
4401 .o_getattr_async = osc_getattr_async,
4402 .o_setattr = osc_setattr,
4403 .o_setattr_async = osc_setattr_async,
4405 .o_punch = osc_punch,
4407 .o_enqueue = osc_enqueue,
4408 .o_change_cbdata = osc_change_cbdata,
4409 .o_cancel = osc_cancel,
4410 .o_cancel_unused = osc_cancel_unused,
4411 .o_iocontrol = osc_iocontrol,
4412 .o_get_info = osc_get_info,
4413 .o_set_info_async = osc_set_info_async,
4414 .o_import_event = osc_import_event,
4415 .o_llog_init = osc_llog_init,
4416 .o_llog_finish = osc_llog_finish,
4417 .o_process_config = osc_process_config,
4420 extern struct lu_kmem_descr osc_caches[];
4421 extern spinlock_t osc_ast_guard;
4422 extern struct lock_class_key osc_ast_guard_class;
4424 int __init osc_init(void)
4426 struct lprocfs_static_vars lvars = { 0 };
4430 /* print an address of _any_ initialized kernel symbol from this
4431 * module, to allow debugging with gdb that doesn't support data
4432 * symbols from modules.*/
4433 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4435 rc = lu_kmem_init(osc_caches);
4437 lprocfs_osc_init_vars(&lvars);
4439 request_module("lquota");
4440 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4441 lquota_init(quota_interface);
4442 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4444 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4445 LUSTRE_OSC_NAME, &osc_device_type);
4447 if (quota_interface)
4448 PORTAL_SYMBOL_PUT(osc_quota_interface);
4449 lu_kmem_fini(osc_caches);
4453 spin_lock_init(&osc_ast_guard);
4454 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4456 osc_mds_ost_orig_logops = llog_lvfs_ops;
4457 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4458 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4459 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4460 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4466 static void /*__exit*/ osc_exit(void)
4468 lu_device_type_fini(&osc_device_type);
4470 lquota_exit(quota_interface);
4471 if (quota_interface)
4472 PORTAL_SYMBOL_PUT(osc_quota_interface);
4474 class_unregister_type(LUSTRE_OSC_NAME);
4475 lu_kmem_fini(osc_caches);
4478 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4479 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4480 MODULE_LICENSE("GPL");
4482 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);