1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71 struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 LASSERT(lsm->lsm_object_gr);
100 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109 struct lov_mds_md *lmm, int lmm_bytes)
115 if (lmm_bytes < sizeof (*lmm)) {
116 CERROR("lov_mds_md too small: %d, need %d\n",
117 lmm_bytes, (int)sizeof(*lmm));
120 /* XXX LOV_MAGIC etc check? */
122 if (lmm->lmm_object_id == 0) {
123 CERROR("lov_mds_md: zero lmm_object_id\n");
128 lsm_size = lov_stripe_md_size(1);
132 if (*lsmp != NULL && lmm == NULL) {
133 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134 OBD_FREE(*lsmp, lsm_size);
140 OBD_ALLOC(*lsmp, lsm_size);
143 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145 OBD_FREE(*lsmp, lsm_size);
148 loi_init((*lsmp)->lsm_oinfo[0]);
152 /* XXX zero *lsmp? */
153 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155 LASSERT((*lsmp)->lsm_object_id);
156 LASSERT((*lsmp)->lsm_object_gr);
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 body->oa = *oinfo->oi_oa;
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214 lustre_swab_ost_body);
216 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
219 /* This should really be sent by the OST */
220 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223 CDEBUG(D_INFO, "can't unpack ost_body\n");
225 aa->aa_oi->oi_oa->o_valid = 0;
228 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233 struct ptlrpc_request_set *set)
235 struct ptlrpc_request *req;
236 struct osc_async_args *aa;
240 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oinfo);
253 ptlrpc_request_set_replen(req);
254 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257 aa = ptlrpc_req_async_args(req);
260 ptlrpc_set_add_req(set, req);
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 *oinfo->oi_oa = body->oa;
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308 struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316 oinfo->oi_oa->o_gr > 0);
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
325 ptlrpc_request_free(req);
329 osc_pack_req_body(req, oinfo);
331 ptlrpc_request_set_replen(req);
333 rc = ptlrpc_queue_wait(req);
337 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
339 GOTO(out, rc = -EPROTO);
341 *oinfo->oi_oa = body->oa;
345 ptlrpc_req_finished(req);
349 static int osc_setattr_interpret(const struct lu_env *env,
350 struct ptlrpc_request *req,
351 struct osc_async_args *aa, int rc)
353 struct ost_body *body;
359 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361 GOTO(out, rc = -EPROTO);
363 *aa->aa_oi->oi_oa = body->oa;
365 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
369 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
370 struct obd_trans_info *oti,
371 struct ptlrpc_request_set *rqset)
373 struct ptlrpc_request *req;
374 struct osc_async_args *aa;
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385 ptlrpc_request_free(req);
389 osc_pack_req_body(req, oinfo);
391 ptlrpc_request_set_replen(req);
393 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
395 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398 /* do mds to ost setattr asynchronously */
400 /* Do not wait for response. */
401 ptlrpcd_add_req(req, PSCOPE_OTHER);
403 req->rq_interpret_reply =
404 (ptlrpc_interpterer_t)osc_setattr_interpret;
406 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
407 aa = ptlrpc_req_async_args(req);
410 ptlrpc_set_add_req(rqset, req);
416 int osc_real_create(struct obd_export *exp, struct obdo *oa,
417 struct lov_stripe_md **ea, struct obd_trans_info *oti)
419 struct ptlrpc_request *req;
420 struct ost_body *body;
421 struct lov_stripe_md *lsm;
430 rc = obd_alloc_memmd(exp, &lsm);
435 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
437 GOTO(out, rc = -ENOMEM);
439 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
441 ptlrpc_request_free(req);
445 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
449 ptlrpc_request_set_replen(req);
451 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
452 oa->o_flags == OBD_FL_DELORPHAN) {
454 "delorphan from OST integration");
455 /* Don't resend the delorphan req */
456 req->rq_no_resend = req->rq_no_delay = 1;
459 rc = ptlrpc_queue_wait(req);
463 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
465 GOTO(out_req, rc = -EPROTO);
469 /* This should really be sent by the OST */
470 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
471 oa->o_valid |= OBD_MD_FLBLKSZ;
473 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
474 * have valid lsm_oinfo data structs, so don't go touching that.
475 * This needs to be fixed in a big way.
477 lsm->lsm_object_id = oa->o_id;
478 lsm->lsm_object_gr = oa->o_gr;
482 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
484 if (oa->o_valid & OBD_MD_FLCOOKIE) {
485 if (!oti->oti_logcookies)
486 oti_alloc_cookies(oti, 1);
487 *oti->oti_logcookies = oa->o_lcookie;
491 CDEBUG(D_HA, "transno: "LPD64"\n",
492 lustre_msg_get_transno(req->rq_repmsg));
494 ptlrpc_req_finished(req);
497 obd_free_memmd(exp, &lsm);
501 static int osc_punch_interpret(const struct lu_env *env,
502 struct ptlrpc_request *req,
503 struct osc_punch_args *aa, int rc)
505 struct ost_body *body;
511 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
513 GOTO(out, rc = -EPROTO);
515 *aa->pa_oa = body->oa;
517 rc = aa->pa_upcall(aa->pa_cookie, rc);
521 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
522 struct obd_capa *capa,
523 obd_enqueue_update_f upcall, void *cookie,
524 struct ptlrpc_request_set *rqset)
526 struct ptlrpc_request *req;
527 struct osc_punch_args *aa;
528 struct ost_body *body;
532 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
536 osc_set_capa_size(req, &RMF_CAPA1, capa);
537 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
539 ptlrpc_request_free(req);
542 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543 ptlrpc_at_set_req_timeout(req);
545 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548 osc_pack_capa(req, body, capa);
550 ptlrpc_request_set_replen(req);
553 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
554 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
555 aa = ptlrpc_req_async_args(req);
557 aa->pa_upcall = upcall;
558 aa->pa_cookie = cookie;
559 if (rqset == PTLRPCD_SET)
560 ptlrpcd_add_req(req, PSCOPE_OTHER);
562 ptlrpc_set_add_req(rqset, req);
567 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
568 struct obd_trans_info *oti,
569 struct ptlrpc_request_set *rqset)
571 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
572 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
573 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
574 return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
575 oinfo->oi_cb_up, oinfo, rqset);
578 static int osc_sync(struct obd_export *exp, struct obdo *oa,
579 struct lov_stripe_md *md, obd_size start, obd_size end,
582 struct ptlrpc_request *req;
583 struct ost_body *body;
588 CDEBUG(D_INFO, "oa NULL\n");
592 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
596 osc_set_capa_size(req, &RMF_CAPA1, capa);
597 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
599 ptlrpc_request_free(req);
603 /* overload the size and blocks fields in the oa with start/end */
604 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607 body->oa.o_size = start;
608 body->oa.o_blocks = end;
609 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
610 osc_pack_capa(req, body, capa);
612 ptlrpc_request_set_replen(req);
614 rc = ptlrpc_queue_wait(req);
618 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
620 GOTO(out, rc = -EPROTO);
626 ptlrpc_req_finished(req);
630 /* Find and cancel locally locks matched by @mode in the resource found by
631 * @objid. Found locks are added into @cancel list. Returns the amount of
632 * locks added to @cancels list. */
633 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
634 struct list_head *cancels, ldlm_mode_t mode,
637 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
638 struct ldlm_res_id res_id;
639 struct ldlm_resource *res;
643 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
644 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
648 LDLM_RESOURCE_ADDREF(res);
649 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
650 lock_flags, 0, NULL);
651 LDLM_RESOURCE_DELREF(res);
652 ldlm_resource_putref(res);
656 static int osc_destroy_interpret(const struct lu_env *env,
657 struct ptlrpc_request *req, void *data,
660 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
662 atomic_dec(&cli->cl_destroy_in_flight);
663 cfs_waitq_signal(&cli->cl_destroy_waitq);
667 static int osc_can_send_destroy(struct client_obd *cli)
669 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
670 cli->cl_max_rpcs_in_flight) {
671 /* The destroy request can be sent */
674 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
675 cli->cl_max_rpcs_in_flight) {
677 * The counter has been modified between the two atomic
680 cfs_waitq_signal(&cli->cl_destroy_waitq);
685 /* Destroy requests can be async always on the client, and we don't even really
686 * care about the return code since the client cannot do anything at all about
688 * When the MDS is unlinking a filename, it saves the file objects into a
689 * recovery llog, and these object records are cancelled when the OST reports
690 * they were destroyed and sync'd to disk (i.e. transaction committed).
691 * If the client dies, or the OST is down when the object should be destroyed,
692 * the records are not cancelled, and when the OST reconnects to the MDS next,
693 * it will retrieve the llog unlink logs and then sends the log cancellation
694 * cookies to the MDS after committing destroy transactions. */
695 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
696 struct lov_stripe_md *ea, struct obd_trans_info *oti,
697 struct obd_export *md_export, void *capa)
699 struct client_obd *cli = &exp->exp_obd->u.cli;
700 struct ptlrpc_request *req;
701 struct ost_body *body;
702 CFS_LIST_HEAD(cancels);
707 CDEBUG(D_INFO, "oa NULL\n");
711 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
712 LDLM_FL_DISCARD_DATA);
714 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
716 ldlm_lock_list_put(&cancels, l_bl_ast, count);
720 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
721 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724 ptlrpc_request_free(req);
728 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
729 req->rq_interpret_reply = osc_destroy_interpret;
730 ptlrpc_at_set_req_timeout(req);
732 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
733 oa->o_lcookie = *oti->oti_logcookies;
734 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
738 osc_pack_capa(req, body, (struct obd_capa *)capa);
739 ptlrpc_request_set_replen(req);
741 if (!osc_can_send_destroy(cli)) {
742 struct l_wait_info lwi = { 0 };
745 * Wait until the number of on-going destroy RPCs drops
746 * under max_rpc_in_flight
748 l_wait_event_exclusive(cli->cl_destroy_waitq,
749 osc_can_send_destroy(cli), &lwi);
752 /* Do not wait for response */
753 ptlrpcd_add_req(req, PSCOPE_OTHER);
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
760 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
762 LASSERT(!(oa->o_valid & bits));
765 client_obd_list_lock(&cli->cl_loi_list_lock);
766 oa->o_dirty = cli->cl_dirty;
767 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
771 } else if (atomic_read(&obd_dirty_pages) -
772 atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
773 CERROR("dirty %d - %d > system dirty_max %d\n",
774 atomic_read(&obd_dirty_pages),
775 atomic_read(&obd_dirty_transit_pages),
776 obd_max_dirty_pages);
778 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
779 CERROR("dirty %lu - dirty_max %lu too big???\n",
780 cli->cl_dirty, cli->cl_dirty_max);
783 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
784 (cli->cl_max_rpcs_in_flight + 1);
785 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
787 oa->o_grant = cli->cl_avail_grant;
788 oa->o_dropped = cli->cl_lost_grant;
789 cli->cl_lost_grant = 0;
790 client_obd_list_unlock(&cli->cl_loi_list_lock);
791 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
792 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
795 /* caller must hold loi_list_lock */
796 static void osc_consume_write_grant(struct client_obd *cli,
797 struct brw_page *pga)
799 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
800 atomic_inc(&obd_dirty_pages);
801 cli->cl_dirty += CFS_PAGE_SIZE;
802 cli->cl_avail_grant -= CFS_PAGE_SIZE;
803 pga->flag |= OBD_BRW_FROM_GRANT;
804 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
805 CFS_PAGE_SIZE, pga, pga->pg);
806 LASSERT(cli->cl_avail_grant >= 0);
809 /* the companion to osc_consume_write_grant, called when a brw has completed.
810 * must be called with the loi lock held. */
811 static void osc_release_write_grant(struct client_obd *cli,
812 struct brw_page *pga, int sent)
814 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
817 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
822 pga->flag &= ~OBD_BRW_FROM_GRANT;
823 atomic_dec(&obd_dirty_pages);
824 cli->cl_dirty -= CFS_PAGE_SIZE;
825 if (pga->flag & OBD_BRW_NOCACHE) {
826 pga->flag &= ~OBD_BRW_NOCACHE;
827 atomic_dec(&obd_dirty_transit_pages);
828 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
831 cli->cl_lost_grant += CFS_PAGE_SIZE;
832 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
833 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
834 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
835 /* For short writes we shouldn't count parts of pages that
836 * span a whole block on the OST side, or our accounting goes
837 * wrong. Should match the code in filter_grant_check. */
838 int offset = pga->off & ~CFS_PAGE_MASK;
839 int count = pga->count + (offset & (blocksize - 1));
840 int end = (offset + pga->count) & (blocksize - 1);
842 count += blocksize - end;
844 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
845 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
846 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
847 cli->cl_avail_grant, cli->cl_dirty);
853 static unsigned long rpcs_in_flight(struct client_obd *cli)
855 return cli->cl_r_in_flight + cli->cl_w_in_flight;
858 /* caller must hold loi_list_lock */
859 void osc_wake_cache_waiters(struct client_obd *cli)
861 struct list_head *l, *tmp;
862 struct osc_cache_waiter *ocw;
865 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
866 /* if we can't dirty more, we must wait until some is written */
867 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
868 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
869 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
870 "osc max %ld, sys max %d\n", cli->cl_dirty,
871 cli->cl_dirty_max, obd_max_dirty_pages);
875 /* if still dirty cache but no grant wait for pending RPCs that
876 * may yet return us some grant before doing sync writes */
877 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
878 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
879 cli->cl_w_in_flight);
883 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
884 list_del_init(&ocw->ocw_entry);
885 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
886 /* no more RPCs in flight to return grant, do sync IO */
887 ocw->ocw_rc = -EDQUOT;
888 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
890 osc_consume_write_grant(cli,
891 &ocw->ocw_oap->oap_brw_page);
894 cfs_waitq_signal(&ocw->ocw_waitq);
900 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
902 client_obd_list_lock(&cli->cl_loi_list_lock);
903 cli->cl_avail_grant = ocd->ocd_grant;
904 client_obd_list_unlock(&cli->cl_loi_list_lock);
906 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
907 cli->cl_avail_grant, cli->cl_lost_grant);
908 LASSERT(cli->cl_avail_grant >= 0);
911 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
913 client_obd_list_lock(&cli->cl_loi_list_lock);
914 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
915 if (body->oa.o_valid & OBD_MD_FLGRANT)
916 cli->cl_avail_grant += body->oa.o_grant;
917 /* waiters are woken in brw_interpret */
918 client_obd_list_unlock(&cli->cl_loi_list_lock);
921 /* We assume that the reason this OSC got a short read is because it read
922 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
923 * via the LOV, and it _knows_ it's reading inside the file, it's just that
924 * this stripe never got written at or beyond this stripe offset yet. */
925 static void handle_short_read(int nob_read, obd_count page_count,
926 struct brw_page **pga)
931 /* skip bytes read OK */
932 while (nob_read > 0) {
933 LASSERT (page_count > 0);
935 if (pga[i]->count > nob_read) {
936 /* EOF inside this page */
937 ptr = cfs_kmap(pga[i]->pg) +
938 (pga[i]->off & ~CFS_PAGE_MASK);
939 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
940 cfs_kunmap(pga[i]->pg);
946 nob_read -= pga[i]->count;
951 /* zero remaining pages */
952 while (page_count-- > 0) {
953 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
954 memset(ptr, 0, pga[i]->count);
955 cfs_kunmap(pga[i]->pg);
960 static int check_write_rcs(struct ptlrpc_request *req,
961 int requested_nob, int niocount,
962 obd_count page_count, struct brw_page **pga)
966 /* return error if any niobuf was in error */
967 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
968 sizeof(*remote_rcs) * niocount, NULL);
969 if (remote_rcs == NULL) {
970 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
973 if (lustre_msg_swabbed(req->rq_repmsg))
974 for (i = 0; i < niocount; i++)
975 __swab32s(&remote_rcs[i]);
977 for (i = 0; i < niocount; i++) {
978 if (remote_rcs[i] < 0)
979 return(remote_rcs[i]);
981 if (remote_rcs[i] != 0) {
982 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
983 i, remote_rcs[i], req);
988 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
989 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
990 req->rq_bulk->bd_nob_transferred, requested_nob);
997 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
999 if (p1->flag != p2->flag) {
1000 unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
1002 /* warn if we try to combine flags that we don't know to be
1003 * safe to combine */
1004 if ((p1->flag & mask) != (p2->flag & mask))
1005 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1006 "same brw?\n", p1->flag, p2->flag);
1010 return (p1->off + p1->count == p2->off);
1013 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1014 struct brw_page **pga, int opc,
1015 cksum_type_t cksum_type)
1020 LASSERT (pg_count > 0);
1021 cksum = init_checksum(cksum_type);
1022 while (nob > 0 && pg_count > 0) {
1023 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1024 int off = pga[i]->off & ~CFS_PAGE_MASK;
1025 int count = pga[i]->count > nob ? nob : pga[i]->count;
1027 /* corrupt the data before we compute the checksum, to
1028 * simulate an OST->client data error */
1029 if (i == 0 && opc == OST_READ &&
1030 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1031 memcpy(ptr + off, "bad1", min(4, nob));
1032 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1033 cfs_kunmap(pga[i]->pg);
1034 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1037 nob -= pga[i]->count;
1041 /* For sending we only compute the wrong checksum instead
1042 * of corrupting the data so it is still correct on a redo */
1043 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1049 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1050 struct lov_stripe_md *lsm, obd_count page_count,
1051 struct brw_page **pga,
1052 struct ptlrpc_request **reqp,
1053 struct obd_capa *ocapa, int reserve)
1055 struct ptlrpc_request *req;
1056 struct ptlrpc_bulk_desc *desc;
1057 struct ost_body *body;
1058 struct obd_ioobj *ioobj;
1059 struct niobuf_remote *niobuf;
1060 int niocount, i, requested_nob, opc, rc;
1061 struct osc_brw_async_args *aa;
1062 struct req_capsule *pill;
1063 struct brw_page *pg_prev;
1066 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1067 RETURN(-ENOMEM); /* Recoverable */
1068 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1069 RETURN(-EINVAL); /* Fatal */
1071 if ((cmd & OBD_BRW_WRITE) != 0) {
1073 req = ptlrpc_request_alloc_pool(cli->cl_import,
1074 cli->cl_import->imp_rq_pool,
1078 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1083 for (niocount = i = 1; i < page_count; i++) {
1084 if (!can_merge_pages(pga[i - 1], pga[i]))
1088 pill = &req->rq_pill;
1089 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1090 niocount * sizeof(*niobuf));
1091 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1093 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1095 ptlrpc_request_free(req);
1098 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1099 ptlrpc_at_set_req_timeout(req);
1101 if (opc == OST_WRITE)
1102 desc = ptlrpc_prep_bulk_imp(req, page_count,
1103 BULK_GET_SOURCE, OST_BULK_PORTAL);
1105 desc = ptlrpc_prep_bulk_imp(req, page_count,
1106 BULK_PUT_SINK, OST_BULK_PORTAL);
1109 GOTO(out, rc = -ENOMEM);
1110 /* NB request now owns desc and will free it when it gets freed */
1112 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1113 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1114 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1115 LASSERT(body && ioobj && niobuf);
1119 obdo_to_ioobj(oa, ioobj);
1120 ioobj->ioo_bufcnt = niocount;
1121 osc_pack_capa(req, body, ocapa);
1122 LASSERT (page_count > 0);
1124 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1125 struct brw_page *pg = pga[i];
1127 LASSERT(pg->count > 0);
1128 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1129 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1130 pg->off, pg->count);
1132 LASSERTF(i == 0 || pg->off > pg_prev->off,
1133 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1134 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1136 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1137 pg_prev->pg, page_private(pg_prev->pg),
1138 pg_prev->pg->index, pg_prev->off);
1140 LASSERTF(i == 0 || pg->off > pg_prev->off,
1141 "i %d p_c %u\n", i, page_count);
1143 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1144 (pg->flag & OBD_BRW_SRVLOCK));
1146 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1148 requested_nob += pg->count;
1150 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1152 niobuf->len += pg->count;
1154 niobuf->offset = pg->off;
1155 niobuf->len = pg->count;
1156 niobuf->flags = pg->flag;
1161 LASSERTF((void *)(niobuf - niocount) ==
1162 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1163 niocount * sizeof(*niobuf)),
1164 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1165 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1166 (void *)(niobuf - niocount));
1168 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1170 /* size[REQ_REC_OFF] still sizeof (*body) */
1171 if (opc == OST_WRITE) {
1172 if (unlikely(cli->cl_checksum) &&
1173 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1174 /* store cl_cksum_type in a local variable since
1175 * it can be changed via lprocfs */
1176 cksum_type_t cksum_type = cli->cl_cksum_type;
1178 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1179 oa->o_flags = body->oa.o_flags = 0;
1180 body->oa.o_flags |= cksum_type_pack(cksum_type);
1181 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1182 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1186 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1188 /* save this in 'oa', too, for later checking */
1189 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1190 oa->o_flags |= cksum_type_pack(cksum_type);
1192 /* clear out the checksum flag, in case this is a
1193 * resend but cl_checksum is no longer set. b=11238 */
1194 oa->o_valid &= ~OBD_MD_FLCKSUM;
1196 oa->o_cksum = body->oa.o_cksum;
1197 /* 1 RC per niobuf */
1198 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1199 sizeof(__u32) * niocount);
1201 if (unlikely(cli->cl_checksum) &&
1202 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1203 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1204 body->oa.o_flags = 0;
1205 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1206 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1208 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1209 /* 1 RC for the whole I/O */
1211 ptlrpc_request_set_replen(req);
1213 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1214 aa = ptlrpc_req_async_args(req);
1216 aa->aa_requested_nob = requested_nob;
1217 aa->aa_nio_count = niocount;
1218 aa->aa_page_count = page_count;
1222 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1223 if (ocapa && reserve)
1224 aa->aa_ocapa = capa_get(ocapa);
1230 ptlrpc_req_finished(req);
1234 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1235 __u32 client_cksum, __u32 server_cksum, int nob,
1236 obd_count page_count, struct brw_page **pga,
1237 cksum_type_t client_cksum_type)
1241 cksum_type_t cksum_type;
1243 if (server_cksum == client_cksum) {
1244 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1248 if (oa->o_valid & OBD_MD_FLFLAGS)
1249 cksum_type = cksum_type_unpack(oa->o_flags);
1251 cksum_type = OBD_CKSUM_CRC32;
1253 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1256 if (cksum_type != client_cksum_type)
1257 msg = "the server did not use the checksum type specified in "
1258 "the original request - likely a protocol problem";
1259 else if (new_cksum == server_cksum)
1260 msg = "changed on the client after we checksummed it - "
1261 "likely false positive due to mmap IO (bug 11742)";
1262 else if (new_cksum == client_cksum)
1263 msg = "changed in transit before arrival at OST";
1265 msg = "changed in transit AND doesn't match the original - "
1266 "likely false positive due to mmap IO (bug 11742)";
1268 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1269 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1270 "["LPU64"-"LPU64"]\n",
1271 msg, libcfs_nid2str(peer->nid),
1272 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1273 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1276 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1278 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1279 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1280 "client csum now %x\n", client_cksum, client_cksum_type,
1281 server_cksum, cksum_type, new_cksum);
1285 /* Note rc enters this function as number of bytes transferred */
1286 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1288 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1289 const lnet_process_id_t *peer =
1290 &req->rq_import->imp_connection->c_peer;
1291 struct client_obd *cli = aa->aa_cli;
1292 struct ost_body *body;
1293 __u32 client_cksum = 0;
1296 if (rc < 0 && rc != -EDQUOT)
1299 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1300 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1301 lustre_swab_ost_body);
1303 CDEBUG(D_INFO, "Can't unpack body\n");
1307 /* set/clear over quota flag for a uid/gid */
1308 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1309 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1310 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1311 body->oa.o_gid, body->oa.o_valid,
1317 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1318 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1320 osc_update_grant(cli, body);
1322 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1324 CERROR("Unexpected +ve rc %d\n", rc);
1327 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1329 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1330 check_write_checksum(&body->oa, peer, client_cksum,
1331 body->oa.o_cksum, aa->aa_requested_nob,
1332 aa->aa_page_count, aa->aa_ppga,
1333 cksum_type_unpack(aa->aa_oa->o_flags)))
1336 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1339 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1340 aa->aa_page_count, aa->aa_ppga);
1344 /* The rest of this function executes only for OST_READs */
1345 if (rc > aa->aa_requested_nob) {
1346 CERROR("Unexpected rc %d (%d requested)\n", rc,
1347 aa->aa_requested_nob);
1351 if (rc != req->rq_bulk->bd_nob_transferred) {
1352 CERROR ("Unexpected rc %d (%d transferred)\n",
1353 rc, req->rq_bulk->bd_nob_transferred);
1357 if (rc < aa->aa_requested_nob)
1358 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1360 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1362 GOTO(out, rc = -EAGAIN);
1364 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1365 static int cksum_counter;
1366 __u32 server_cksum = body->oa.o_cksum;
1369 cksum_type_t cksum_type;
1371 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1372 cksum_type = cksum_type_unpack(body->oa.o_flags);
1374 cksum_type = OBD_CKSUM_CRC32;
1375 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1376 aa->aa_ppga, OST_READ,
1379 if (peer->nid == req->rq_bulk->bd_sender) {
1383 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1386 if (server_cksum == ~0 && rc > 0) {
1387 CERROR("Protocol error: server %s set the 'checksum' "
1388 "bit, but didn't send a checksum. Not fatal, "
1389 "but please notify on http://bugzilla.lustre.org/\n",
1390 libcfs_nid2str(peer->nid));
1391 } else if (server_cksum != client_cksum) {
1392 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1393 "%s%s%s inum "LPU64"/"LPU64" object "
1394 LPU64"/"LPU64" extent "
1395 "["LPU64"-"LPU64"]\n",
1396 req->rq_import->imp_obd->obd_name,
1397 libcfs_nid2str(peer->nid),
1399 body->oa.o_valid & OBD_MD_FLFID ?
1400 body->oa.o_fid : (__u64)0,
1401 body->oa.o_valid & OBD_MD_FLFID ?
1402 body->oa.o_generation :(__u64)0,
1404 body->oa.o_valid & OBD_MD_FLGROUP ?
1405 body->oa.o_gr : (__u64)0,
1406 aa->aa_ppga[0]->off,
1407 aa->aa_ppga[aa->aa_page_count-1]->off +
1408 aa->aa_ppga[aa->aa_page_count-1]->count -
1410 CERROR("client %x, server %x, cksum_type %x\n",
1411 client_cksum, server_cksum, cksum_type);
1413 aa->aa_oa->o_cksum = client_cksum;
1417 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1420 } else if (unlikely(client_cksum)) {
1421 static int cksum_missed;
1424 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1425 CERROR("Checksum %u requested from %s but not sent\n",
1426 cksum_missed, libcfs_nid2str(peer->nid));
1432 *aa->aa_oa = body->oa;
1437 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1438 struct lov_stripe_md *lsm,
1439 obd_count page_count, struct brw_page **pga,
1440 struct obd_capa *ocapa)
1442 struct ptlrpc_request *req;
1446 struct l_wait_info lwi;
1450 cfs_waitq_init(&waitq);
1453 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1454 page_count, pga, &req, ocapa, 0);
1458 rc = ptlrpc_queue_wait(req);
1460 if (rc == -ETIMEDOUT && req->rq_resend) {
1461 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1462 ptlrpc_req_finished(req);
1466 rc = osc_brw_fini_request(req, rc);
1468 ptlrpc_req_finished(req);
1469 if (osc_recoverable_error(rc)) {
1471 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1472 CERROR("too many resend retries, returning error\n");
1476 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1477 l_wait_event(waitq, 0, &lwi);
1485 int osc_brw_redo_request(struct ptlrpc_request *request,
1486 struct osc_brw_async_args *aa)
1488 struct ptlrpc_request *new_req;
1489 struct ptlrpc_request_set *set = request->rq_set;
1490 struct osc_brw_async_args *new_aa;
1491 struct osc_async_page *oap;
1495 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1496 CERROR("too many resend retries, returning error\n");
1500 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1502 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1503 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1504 aa->aa_cli, aa->aa_oa,
1505 NULL /* lsm unused by osc currently */,
1506 aa->aa_page_count, aa->aa_ppga,
1507 &new_req, aa->aa_ocapa, 0);
1511 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1513 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1514 if (oap->oap_request != NULL) {
1515 LASSERTF(request == oap->oap_request,
1516 "request %p != oap_request %p\n",
1517 request, oap->oap_request);
1518 if (oap->oap_interrupted) {
1519 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1520 ptlrpc_req_finished(new_req);
1525 /* New request takes over pga and oaps from old request.
1526 * Note that copying a list_head doesn't work, need to move it... */
1528 new_req->rq_interpret_reply = request->rq_interpret_reply;
1529 new_req->rq_async_args = request->rq_async_args;
1530 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1532 new_aa = ptlrpc_req_async_args(new_req);
1534 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1535 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1536 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1538 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1539 if (oap->oap_request) {
1540 ptlrpc_req_finished(oap->oap_request);
1541 oap->oap_request = ptlrpc_request_addref(new_req);
1545 new_aa->aa_ocapa = aa->aa_ocapa;
1546 aa->aa_ocapa = NULL;
1548 /* use ptlrpc_set_add_req is safe because interpret functions work
1549 * in check_set context. only one way exist with access to request
1550 * from different thread got -EINTR - this way protected with
1551 * cl_loi_list_lock */
1552 ptlrpc_set_add_req(set, new_req);
1554 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1556 DEBUG_REQ(D_INFO, new_req, "new request");
1561 * ugh, we want disk allocation on the target to happen in offset order. we'll
1562 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1563 * fine for our small page arrays and doesn't require allocation. its an
1564 * insertion sort that swaps elements that are strides apart, shrinking the
1565 * stride down until its '1' and the array is sorted.
1567 static void sort_brw_pages(struct brw_page **array, int num)
1570 struct brw_page *tmp;
1574 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1579 for (i = stride ; i < num ; i++) {
1582 while (j >= stride && array[j - stride]->off > tmp->off) {
1583 array[j] = array[j - stride];
1588 } while (stride > 1);
1591 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1597 LASSERT (pages > 0);
1598 offset = pg[i]->off & ~CFS_PAGE_MASK;
1602 if (pages == 0) /* that's all */
1605 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1606 return count; /* doesn't end on page boundary */
1609 offset = pg[i]->off & ~CFS_PAGE_MASK;
1610 if (offset != 0) /* doesn't start on page boundary */
1617 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1619 struct brw_page **ppga;
1622 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1626 for (i = 0; i < count; i++)
1631 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1633 LASSERT(ppga != NULL);
1634 OBD_FREE(ppga, sizeof(*ppga) * count);
1637 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1638 obd_count page_count, struct brw_page *pga,
1639 struct obd_trans_info *oti)
1641 struct obdo *saved_oa = NULL;
1642 struct brw_page **ppga, **orig;
1643 struct obd_import *imp = class_exp2cliimp(exp);
1644 struct client_obd *cli = &imp->imp_obd->u.cli;
1645 int rc, page_count_orig;
1648 if (cmd & OBD_BRW_CHECK) {
1649 /* The caller just wants to know if there's a chance that this
1650 * I/O can succeed */
1652 if (imp == NULL || imp->imp_invalid)
1657 /* test_brw with a failed create can trip this, maybe others. */
1658 LASSERT(cli->cl_max_pages_per_rpc);
1662 orig = ppga = osc_build_ppga(pga, page_count);
1665 page_count_orig = page_count;
1667 sort_brw_pages(ppga, page_count);
1668 while (page_count) {
1669 obd_count pages_per_brw;
1671 if (page_count > cli->cl_max_pages_per_rpc)
1672 pages_per_brw = cli->cl_max_pages_per_rpc;
1674 pages_per_brw = page_count;
1676 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1678 if (saved_oa != NULL) {
1679 /* restore previously saved oa */
1680 *oinfo->oi_oa = *saved_oa;
1681 } else if (page_count > pages_per_brw) {
1682 /* save a copy of oa (brw will clobber it) */
1683 OBDO_ALLOC(saved_oa);
1684 if (saved_oa == NULL)
1685 GOTO(out, rc = -ENOMEM);
1686 *saved_oa = *oinfo->oi_oa;
1689 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1690 pages_per_brw, ppga, oinfo->oi_capa);
1695 page_count -= pages_per_brw;
1696 ppga += pages_per_brw;
1700 osc_release_ppga(orig, page_count_orig);
1702 if (saved_oa != NULL)
1703 OBDO_FREE(saved_oa);
1708 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1709 * the dirty accounting. Writeback completes or truncate happens before
1710 * writing starts. Must be called with the loi lock held. */
1711 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1714 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1718 /* This maintains the lists of pending pages to read/write for a given object
1719 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1720 * to quickly find objects that are ready to send an RPC. */
1721 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1727 if (lop->lop_num_pending == 0)
1730 /* if we have an invalid import we want to drain the queued pages
1731 * by forcing them through rpcs that immediately fail and complete
1732 * the pages. recovery relies on this to empty the queued pages
1733 * before canceling the locks and evicting down the llite pages */
1734 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1737 /* stream rpcs in queue order as long as as there is an urgent page
1738 * queued. this is our cheap solution for good batching in the case
1739 * where writepage marks some random page in the middle of the file
1740 * as urgent because of, say, memory pressure */
1741 if (!list_empty(&lop->lop_urgent)) {
1742 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1745 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1746 optimal = cli->cl_max_pages_per_rpc;
1747 if (cmd & OBD_BRW_WRITE) {
1748 /* trigger a write rpc stream as long as there are dirtiers
1749 * waiting for space. as they're waiting, they're not going to
1750 * create more pages to coallesce with what's waiting.. */
1751 if (!list_empty(&cli->cl_cache_waiters)) {
1752 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1755 /* +16 to avoid triggering rpcs that would want to include pages
1756 * that are being queued but which can't be made ready until
1757 * the queuer finishes with the page. this is a wart for
1758 * llite::commit_write() */
1761 if (lop->lop_num_pending >= optimal)
1767 static void on_list(struct list_head *item, struct list_head *list,
1770 if (list_empty(item) && should_be_on)
1771 list_add_tail(item, list);
1772 else if (!list_empty(item) && !should_be_on)
1773 list_del_init(item);
1776 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1777 * can find pages to build into rpcs quickly */
1778 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1780 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1781 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1782 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1784 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1785 loi->loi_write_lop.lop_num_pending);
1787 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1788 loi->loi_read_lop.lop_num_pending);
1791 static void lop_update_pending(struct client_obd *cli,
1792 struct loi_oap_pages *lop, int cmd, int delta)
1794 lop->lop_num_pending += delta;
1795 if (cmd & OBD_BRW_WRITE)
1796 cli->cl_pending_w_pages += delta;
1798 cli->cl_pending_r_pages += delta;
1802 * this is called when a sync waiter receives an interruption. Its job is to
1803 * get the caller woken as soon as possible. If its page hasn't been put in an
1804 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1805 * desiring interruption which will forcefully complete the rpc once the rpc
1808 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1810 struct loi_oap_pages *lop;
1811 struct lov_oinfo *loi;
1815 LASSERT(!oap->oap_interrupted);
1816 oap->oap_interrupted = 1;
1818 /* ok, it's been put in an rpc. only one oap gets a request reference */
1819 if (oap->oap_request != NULL) {
1820 ptlrpc_mark_interrupted(oap->oap_request);
1821 ptlrpcd_wake(oap->oap_request);
1822 ptlrpc_req_finished(oap->oap_request);
1823 oap->oap_request = NULL;
1827 * page completion may be called only if ->cpo_prep() method was
1828 * executed by osc_io_submit(), that also adds page the to pending list
1830 if (!list_empty(&oap->oap_pending_item)) {
1831 list_del_init(&oap->oap_pending_item);
1832 list_del_init(&oap->oap_urgent_item);
1835 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1836 &loi->loi_write_lop : &loi->loi_read_lop;
1837 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1838 loi_list_maint(oap->oap_cli, oap->oap_loi);
1839 rc = oap->oap_caller_ops->ap_completion(env,
1840 oap->oap_caller_data,
1841 oap->oap_cmd, NULL, -EINTR);
1847 /* this is trying to propogate async writeback errors back up to the
1848 * application. As an async write fails we record the error code for later if
1849 * the app does an fsync. As long as errors persist we force future rpcs to be
1850 * sync so that the app can get a sync error and break the cycle of queueing
1851 * pages for which writeback will fail. */
1852 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1859 ar->ar_force_sync = 1;
1860 ar->ar_min_xid = ptlrpc_sample_next_xid();
1865 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1866 ar->ar_force_sync = 0;
1869 void osc_oap_to_pending(struct osc_async_page *oap)
1871 struct loi_oap_pages *lop;
1873 if (oap->oap_cmd & OBD_BRW_WRITE)
1874 lop = &oap->oap_loi->loi_write_lop;
1876 lop = &oap->oap_loi->loi_read_lop;
1878 if (oap->oap_async_flags & ASYNC_URGENT)
1879 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1880 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1881 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1884 /* this must be called holding the loi list lock to give coverage to exit_cache,
1885 * async_flag maintenance, and oap_request */
1886 static void osc_ap_completion(const struct lu_env *env,
1887 struct client_obd *cli, struct obdo *oa,
1888 struct osc_async_page *oap, int sent, int rc)
1893 if (oap->oap_request != NULL) {
1894 xid = ptlrpc_req_xid(oap->oap_request);
1895 ptlrpc_req_finished(oap->oap_request);
1896 oap->oap_request = NULL;
1899 oap->oap_async_flags = 0;
1900 oap->oap_interrupted = 0;
1902 if (oap->oap_cmd & OBD_BRW_WRITE) {
1903 osc_process_ar(&cli->cl_ar, xid, rc);
1904 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1907 if (rc == 0 && oa != NULL) {
1908 if (oa->o_valid & OBD_MD_FLBLOCKS)
1909 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1910 if (oa->o_valid & OBD_MD_FLMTIME)
1911 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1912 if (oa->o_valid & OBD_MD_FLATIME)
1913 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1914 if (oa->o_valid & OBD_MD_FLCTIME)
1915 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1918 rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1919 oap->oap_cmd, oa, rc);
1921 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1922 * I/O on the page could start, but OSC calls it under lock
1923 * and thus we can add oap back to pending safely */
1925 /* upper layer wants to leave the page on pending queue */
1926 osc_oap_to_pending(oap);
1928 osc_exit_cache(cli, oap, sent);
1932 static int brw_interpret(const struct lu_env *env,
1933 struct ptlrpc_request *req, void *data, int rc)
1935 struct osc_brw_async_args *aa = data;
1936 struct client_obd *cli;
1940 rc = osc_brw_fini_request(req, rc);
1941 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1942 if (osc_recoverable_error(rc)) {
1943 rc = osc_brw_redo_request(req, aa);
1949 capa_put(aa->aa_ocapa);
1950 aa->aa_ocapa = NULL;
1955 client_obd_list_lock(&cli->cl_loi_list_lock);
1957 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1958 * is called so we know whether to go to sync BRWs or wait for more
1959 * RPCs to complete */
1960 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1961 cli->cl_w_in_flight--;
1963 cli->cl_r_in_flight--;
1965 async = list_empty(&aa->aa_oaps);
1966 if (!async) { /* from osc_send_oap_rpc() */
1967 struct osc_async_page *oap, *tmp;
1968 /* the caller may re-use the oap after the completion call so
1969 * we need to clean it up a little */
1970 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1971 list_del_init(&oap->oap_rpc_item);
1972 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1974 OBDO_FREE(aa->aa_oa);
1975 } else { /* from async_internal() */
1977 for (i = 0; i < aa->aa_page_count; i++)
1978 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1980 osc_wake_cache_waiters(cli);
1981 osc_check_rpcs(env, cli);
1982 client_obd_list_unlock(&cli->cl_loi_list_lock);
1984 cl_req_completion(env, aa->aa_clerq, rc);
1985 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1989 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1990 struct client_obd *cli,
1991 struct list_head *rpc_list,
1992 int page_count, int cmd)
1994 struct ptlrpc_request *req;
1995 struct brw_page **pga = NULL;
1996 struct osc_brw_async_args *aa;
1997 struct obdo *oa = NULL;
1998 const struct obd_async_page_ops *ops = NULL;
1999 void *caller_data = NULL;
2000 struct osc_async_page *oap;
2001 struct osc_async_page *tmp;
2002 struct ost_body *body;
2003 struct cl_req *clerq = NULL;
2004 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2005 struct ldlm_lock *lock = NULL;
2006 struct cl_req_attr crattr;
2010 LASSERT(!list_empty(rpc_list));
2012 memset(&crattr, 0, sizeof crattr);
2013 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2015 GOTO(out, req = ERR_PTR(-ENOMEM));
2019 GOTO(out, req = ERR_PTR(-ENOMEM));
2022 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2023 struct cl_page *page = osc_oap2cl_page(oap);
2025 ops = oap->oap_caller_ops;
2026 caller_data = oap->oap_caller_data;
2028 clerq = cl_req_alloc(env, page, crt,
2029 1 /* only 1-object rpcs for
2032 GOTO(out, req = (void *)clerq);
2033 lock = oap->oap_ldlm_lock;
2035 pga[i] = &oap->oap_brw_page;
2036 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2037 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2038 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2040 cl_req_page_add(env, clerq, page);
2043 /* always get the data for the obdo for the rpc */
2044 LASSERT(ops != NULL);
2046 crattr.cra_capa = NULL;
2047 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2049 oa->o_handle = lock->l_remote_handle;
2050 oa->o_valid |= OBD_MD_FLHANDLE;
2053 rc = cl_req_prep(env, clerq);
2055 CERROR("cl_req_prep failed: %d\n", rc);
2056 GOTO(out, req = ERR_PTR(rc));
2059 sort_brw_pages(pga, page_count);
2060 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2061 pga, &req, crattr.cra_capa, 1);
2063 CERROR("prep_req failed: %d\n", rc);
2064 GOTO(out, req = ERR_PTR(rc));
2067 /* Need to update the timestamps after the request is built in case
2068 * we race with setattr (locally or in queue at OST). If OST gets
2069 * later setattr before earlier BRW (as determined by the request xid),
2070 * the OST will not use BRW timestamps. Sadly, there is no obvious
2071 * way to do this in a single call. bug 10150 */
2072 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2073 cl_req_attr_set(env, clerq, &crattr,
2074 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2076 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2077 aa = ptlrpc_req_async_args(req);
2078 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2079 list_splice(rpc_list, &aa->aa_oaps);
2080 CFS_INIT_LIST_HEAD(rpc_list);
2081 aa->aa_clerq = clerq;
2083 capa_put(crattr.cra_capa);
2088 OBD_FREE(pga, sizeof(*pga) * page_count);
2089 /* this should happen rarely and is pretty bad, it makes the
2090 * pending list not follow the dirty order */
2091 client_obd_list_lock(&cli->cl_loi_list_lock);
2092 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2093 list_del_init(&oap->oap_rpc_item);
2095 /* queued sync pages can be torn down while the pages
2096 * were between the pending list and the rpc */
2097 if (oap->oap_interrupted) {
2098 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2099 osc_ap_completion(env, cli, NULL, oap, 0,
2103 osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2105 if (clerq && !IS_ERR(clerq))
2106 cl_req_completion(env, clerq, PTR_ERR(req));
2112 * prepare pages for ASYNC io and put pages in send queue.
2116 * \param cmd - OBD_BRW_* macroses
2117 * \param lop - pending pages
2119 * \return zero if pages successfully add to send queue.
2120 * \return not zere if error occurring.
2123 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2124 struct lov_oinfo *loi,
2125 int cmd, struct loi_oap_pages *lop)
2127 struct ptlrpc_request *req;
2128 obd_count page_count = 0;
2129 struct osc_async_page *oap = NULL, *tmp;
2130 struct osc_brw_async_args *aa;
2131 const struct obd_async_page_ops *ops;
2132 CFS_LIST_HEAD(rpc_list);
2133 unsigned int ending_offset;
2134 unsigned starting_offset = 0;
2136 struct cl_object *clob = NULL;
2139 /* first we find the pages we're allowed to work with */
2140 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2142 ops = oap->oap_caller_ops;
2144 LASSERT(oap->oap_magic == OAP_MAGIC);
2147 /* pin object in memory, so that completion call-backs
2148 * can be safely called under client_obd_list lock. */
2149 clob = osc_oap2cl_page(oap)->cp_obj;
2150 cl_object_get(clob);
2153 if (page_count != 0 &&
2154 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2155 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2156 " oap %p, page %p, srvlock %u\n",
2157 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2160 /* in llite being 'ready' equates to the page being locked
2161 * until completion unlocks it. commit_write submits a page
2162 * as not ready because its unlock will happen unconditionally
2163 * as the call returns. if we race with commit_write giving
2164 * us that page we dont' want to create a hole in the page
2165 * stream, so we stop and leave the rpc to be fired by
2166 * another dirtier or kupdated interval (the not ready page
2167 * will still be on the dirty list). we could call in
2168 * at the end of ll_file_write to process the queue again. */
2169 if (!(oap->oap_async_flags & ASYNC_READY)) {
2170 int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2173 CDEBUG(D_INODE, "oap %p page %p returned %d "
2174 "instead of ready\n", oap,
2178 /* llite is telling us that the page is still
2179 * in commit_write and that we should try
2180 * and put it in an rpc again later. we
2181 * break out of the loop so we don't create
2182 * a hole in the sequence of pages in the rpc
2187 /* the io isn't needed.. tell the checks
2188 * below to complete the rpc with EINTR */
2189 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2190 oap->oap_count = -EINTR;
2193 oap->oap_async_flags |= ASYNC_READY;
2196 LASSERTF(0, "oap %p page %p returned %d "
2197 "from make_ready\n", oap,
2205 * Page submitted for IO has to be locked. Either by
2206 * ->ap_make_ready() or by higher layers.
2208 #if defined(__KERNEL__) && defined(__linux__)
2210 struct cl_page *page;
2212 page = osc_oap2cl_page(oap);
2214 if (page->cp_type == CPT_CACHEABLE &&
2215 !(PageLocked(oap->oap_page) &&
2216 (CheckWriteback(oap->oap_page, cmd)))) {
2217 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2219 (long)oap->oap_page->flags,
2220 oap->oap_async_flags);
2225 /* If there is a gap at the start of this page, it can't merge
2226 * with any previous page, so we'll hand the network a
2227 * "fragmented" page array that it can't transfer in 1 RDMA */
2228 if (page_count != 0 && oap->oap_page_off != 0)
2231 /* take the page out of our book-keeping */
2232 list_del_init(&oap->oap_pending_item);
2233 lop_update_pending(cli, lop, cmd, -1);
2234 list_del_init(&oap->oap_urgent_item);
2236 if (page_count == 0)
2237 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2238 (PTLRPC_MAX_BRW_SIZE - 1);
2240 /* ask the caller for the size of the io as the rpc leaves. */
2241 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2243 ops->ap_refresh_count(env, oap->oap_caller_data,
2245 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2247 if (oap->oap_count <= 0) {
2248 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2250 osc_ap_completion(env, cli, NULL,
2251 oap, 0, oap->oap_count);
2255 /* now put the page back in our accounting */
2256 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2257 if (page_count == 0)
2258 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2259 if (++page_count >= cli->cl_max_pages_per_rpc)
2262 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2263 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2264 * have the same alignment as the initial writes that allocated
2265 * extents on the server. */
2266 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2267 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2268 if (ending_offset == 0)
2271 /* If there is a gap at the end of this page, it can't merge
2272 * with any subsequent pages, so we'll hand the network a
2273 * "fragmented" page array that it can't transfer in 1 RDMA */
2274 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2278 osc_wake_cache_waiters(cli);
2280 loi_list_maint(cli, loi);
2282 client_obd_list_unlock(&cli->cl_loi_list_lock);
2285 cl_object_put(env, clob);
2287 if (page_count == 0) {
2288 client_obd_list_lock(&cli->cl_loi_list_lock);
2292 req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2294 LASSERT(list_empty(&rpc_list));
2295 loi_list_maint(cli, loi);
2296 RETURN(PTR_ERR(req));
2299 aa = ptlrpc_req_async_args(req);
2301 if (cmd == OBD_BRW_READ) {
2302 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2303 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2304 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2305 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2307 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2308 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2309 cli->cl_w_in_flight);
2310 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2311 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2313 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2315 client_obd_list_lock(&cli->cl_loi_list_lock);
2317 if (cmd == OBD_BRW_READ)
2318 cli->cl_r_in_flight++;
2320 cli->cl_w_in_flight++;
2322 /* queued sync pages can be torn down while the pages
2323 * were between the pending list and the rpc */
2325 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2326 /* only one oap gets a request reference */
2329 if (oap->oap_interrupted && !req->rq_intr) {
2330 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2332 ptlrpc_mark_interrupted(req);
2336 tmp->oap_request = ptlrpc_request_addref(req);
2338 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2339 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2341 req->rq_interpret_reply = brw_interpret;
2342 ptlrpcd_add_req(req, PSCOPE_BRW);
2346 #define LOI_DEBUG(LOI, STR, args...) \
2347 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2348 !list_empty(&(LOI)->loi_cli_item), \
2349 (LOI)->loi_write_lop.lop_num_pending, \
2350 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2351 (LOI)->loi_read_lop.lop_num_pending, \
2352 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2355 /* This is called by osc_check_rpcs() to find which objects have pages that
2356 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2357 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2360 /* first return all objects which we already know to have
2361 * pages ready to be stuffed into rpcs */
2362 if (!list_empty(&cli->cl_loi_ready_list))
2363 RETURN(list_entry(cli->cl_loi_ready_list.next,
2364 struct lov_oinfo, loi_cli_item));
2366 /* then if we have cache waiters, return all objects with queued
2367 * writes. This is especially important when many small files
2368 * have filled up the cache and not been fired into rpcs because
2369 * they don't pass the nr_pending/object threshhold */
2370 if (!list_empty(&cli->cl_cache_waiters) &&
2371 !list_empty(&cli->cl_loi_write_list))
2372 RETURN(list_entry(cli->cl_loi_write_list.next,
2373 struct lov_oinfo, loi_write_item));
2375 /* then return all queued objects when we have an invalid import
2376 * so that they get flushed */
2377 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2378 if (!list_empty(&cli->cl_loi_write_list))
2379 RETURN(list_entry(cli->cl_loi_write_list.next,
2380 struct lov_oinfo, loi_write_item));
2381 if (!list_empty(&cli->cl_loi_read_list))
2382 RETURN(list_entry(cli->cl_loi_read_list.next,
2383 struct lov_oinfo, loi_read_item));
2388 /* called with the loi list lock held */
2389 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2391 struct lov_oinfo *loi;
2392 int rc = 0, race_counter = 0;
2395 while ((loi = osc_next_loi(cli)) != NULL) {
2396 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2398 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2401 /* attempt some read/write balancing by alternating between
2402 * reads and writes in an object. The makes_rpc checks here
2403 * would be redundant if we were getting read/write work items
2404 * instead of objects. we don't want send_oap_rpc to drain a
2405 * partial read pending queue when we're given this object to
2406 * do io on writes while there are cache waiters */
2407 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2408 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2409 &loi->loi_write_lop);
2417 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2418 rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2419 &loi->loi_read_lop);
2428 /* attempt some inter-object balancing by issueing rpcs
2429 * for each object in turn */
2430 if (!list_empty(&loi->loi_cli_item))
2431 list_del_init(&loi->loi_cli_item);
2432 if (!list_empty(&loi->loi_write_item))
2433 list_del_init(&loi->loi_write_item);
2434 if (!list_empty(&loi->loi_read_item))
2435 list_del_init(&loi->loi_read_item);
2437 loi_list_maint(cli, loi);
2439 /* send_oap_rpc fails with 0 when make_ready tells it to
2440 * back off. llite's make_ready does this when it tries
2441 * to lock a page queued for write that is already locked.
2442 * we want to try sending rpcs from many objects, but we
2443 * don't want to spin failing with 0. */
2444 if (race_counter == 10)
2450 /* we're trying to queue a page in the osc so we're subject to the
2451 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2452 * If the osc's queued pages are already at that limit, then we want to sleep
2453 * until there is space in the osc's queue for us. We also may be waiting for
2454 * write credits from the OST if there are RPCs in flight that may return some
2455 * before we fall back to sync writes.
2457 * We need this know our allocation was granted in the presence of signals */
2458 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2462 client_obd_list_lock(&cli->cl_loi_list_lock);
2463 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2464 client_obd_list_unlock(&cli->cl_loi_list_lock);
2469 * Non-blocking version of osc_enter_cache() that consumes grant only when it
2472 int osc_enter_cache_try(const struct lu_env *env,
2473 struct client_obd *cli, struct lov_oinfo *loi,
2474 struct osc_async_page *oap, int transient)
2478 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2480 osc_consume_write_grant(cli, &oap->oap_brw_page);
2482 cli->cl_dirty_transit += CFS_PAGE_SIZE;
2483 atomic_inc(&obd_dirty_transit_pages);
2484 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2490 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2491 * grant or cache space. */
2492 static int osc_enter_cache(const struct lu_env *env,
2493 struct client_obd *cli, struct lov_oinfo *loi,
2494 struct osc_async_page *oap)
2496 struct osc_cache_waiter ocw;
2497 struct l_wait_info lwi = { 0 };
2501 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2502 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2503 cli->cl_dirty_max, obd_max_dirty_pages,
2504 cli->cl_lost_grant, cli->cl_avail_grant);
2506 /* force the caller to try sync io. this can jump the list
2507 * of queued writes and create a discontiguous rpc stream */
2508 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2509 loi->loi_ar.ar_force_sync)
2512 /* Hopefully normal case - cache space and write credits available */
2513 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2514 atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2515 osc_enter_cache_try(env, cli, loi, oap, 0))
2518 /* Make sure that there are write rpcs in flight to wait for. This
2519 * is a little silly as this object may not have any pending but
2520 * other objects sure might. */
2521 if (cli->cl_w_in_flight) {
2522 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2523 cfs_waitq_init(&ocw.ocw_waitq);
2527 loi_list_maint(cli, loi);
2528 osc_check_rpcs(env, cli);
2529 client_obd_list_unlock(&cli->cl_loi_list_lock);
2531 CDEBUG(D_CACHE, "sleeping for cache space\n");
2532 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2534 client_obd_list_lock(&cli->cl_loi_list_lock);
2535 if (!list_empty(&ocw.ocw_entry)) {
2536 list_del(&ocw.ocw_entry);
2546 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2547 struct lov_oinfo *loi, cfs_page_t *page,
2548 obd_off offset, const struct obd_async_page_ops *ops,
2549 void *data, void **res, int nocache,
2550 struct lustre_handle *lockh)
2552 struct osc_async_page *oap;
2557 return size_round(sizeof(*oap));
2560 oap->oap_magic = OAP_MAGIC;
2561 oap->oap_cli = &exp->exp_obd->u.cli;
2564 oap->oap_caller_ops = ops;
2565 oap->oap_caller_data = data;
2567 oap->oap_page = page;
2568 oap->oap_obj_off = offset;
2569 if (!client_is_remote(exp) &&
2570 cfs_capable(CFS_CAP_SYS_RESOURCE))
2571 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2573 LASSERT(!(offset & ~CFS_PAGE_MASK));
2575 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2576 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2577 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2578 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2580 spin_lock_init(&oap->oap_lock);
2581 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2585 struct osc_async_page *oap_from_cookie(void *cookie)
2587 struct osc_async_page *oap = cookie;
2588 if (oap->oap_magic != OAP_MAGIC)
2589 return ERR_PTR(-EINVAL);
2593 int osc_queue_async_io(const struct lu_env *env,
2594 struct obd_export *exp, struct lov_stripe_md *lsm,
2595 struct lov_oinfo *loi, void *cookie,
2596 int cmd, obd_off off, int count,
2597 obd_flag brw_flags, enum async_flags async_flags)
2599 struct client_obd *cli = &exp->exp_obd->u.cli;
2600 struct osc_async_page *oap;
2604 oap = oap_from_cookie(cookie);
2606 RETURN(PTR_ERR(oap));
2608 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2611 if (!list_empty(&oap->oap_pending_item) ||
2612 !list_empty(&oap->oap_urgent_item) ||
2613 !list_empty(&oap->oap_rpc_item))
2616 /* check if the file's owner/group is over quota */
2617 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2618 struct cl_object *obj;
2619 struct cl_attr attr; /* XXX put attr into thread info */
2621 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2623 cl_object_attr_lock(obj);
2624 rc = cl_object_attr_get(env, obj, &attr);
2625 cl_object_attr_unlock(obj);
2627 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2628 attr.cat_gid) == NO_QUOTA)
2635 loi = lsm->lsm_oinfo[0];
2637 client_obd_list_lock(&cli->cl_loi_list_lock);
2639 LASSERT(off + count <= CFS_PAGE_SIZE);
2641 oap->oap_page_off = off;
2642 oap->oap_count = count;
2643 oap->oap_brw_flags = brw_flags;
2644 oap->oap_async_flags = async_flags;
2646 if (cmd & OBD_BRW_WRITE) {
2647 rc = osc_enter_cache(env, cli, loi, oap);
2649 client_obd_list_unlock(&cli->cl_loi_list_lock);
2654 osc_oap_to_pending(oap);
2655 loi_list_maint(cli, loi);
2657 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2660 osc_check_rpcs(env, cli);
2661 client_obd_list_unlock(&cli->cl_loi_list_lock);
2666 /* aka (~was & now & flag), but this is more clear :) */
2667 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2669 int osc_set_async_flags_base(struct client_obd *cli,
2670 struct lov_oinfo *loi, struct osc_async_page *oap,
2671 obd_flag async_flags)
2673 struct loi_oap_pages *lop;
2676 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2679 if (oap->oap_cmd & OBD_BRW_WRITE) {
2680 lop = &loi->loi_write_lop;
2682 lop = &loi->loi_read_lop;
2685 if (list_empty(&oap->oap_pending_item))
2688 if ((oap->oap_async_flags & async_flags) == async_flags)
2691 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2692 oap->oap_async_flags |= ASYNC_READY;
2694 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2695 if (list_empty(&oap->oap_rpc_item)) {
2696 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2697 loi_list_maint(cli, loi);
2701 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2702 oap->oap_async_flags);
2706 int osc_teardown_async_page(struct obd_export *exp,
2707 struct lov_stripe_md *lsm,
2708 struct lov_oinfo *loi, void *cookie)
2710 struct client_obd *cli = &exp->exp_obd->u.cli;
2711 struct loi_oap_pages *lop;
2712 struct osc_async_page *oap;
2716 oap = oap_from_cookie(cookie);
2718 RETURN(PTR_ERR(oap));
2721 loi = lsm->lsm_oinfo[0];
2723 if (oap->oap_cmd & OBD_BRW_WRITE) {
2724 lop = &loi->loi_write_lop;
2726 lop = &loi->loi_read_lop;
2729 client_obd_list_lock(&cli->cl_loi_list_lock);
2731 if (!list_empty(&oap->oap_rpc_item))
2732 GOTO(out, rc = -EBUSY);
2734 osc_exit_cache(cli, oap, 0);
2735 osc_wake_cache_waiters(cli);
2737 if (!list_empty(&oap->oap_urgent_item)) {
2738 list_del_init(&oap->oap_urgent_item);
2739 oap->oap_async_flags &= ~ASYNC_URGENT;
2741 if (!list_empty(&oap->oap_pending_item)) {
2742 list_del_init(&oap->oap_pending_item);
2743 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2745 loi_list_maint(cli, loi);
2746 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2748 client_obd_list_unlock(&cli->cl_loi_list_lock);
2752 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2753 struct ldlm_enqueue_info *einfo,
2756 void *data = einfo->ei_cbdata;
2758 LASSERT(lock != NULL);
2759 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2760 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2761 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2762 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2764 lock_res_and_lock(lock);
2765 spin_lock(&osc_ast_guard);
2766 LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2767 lock->l_ast_data = data;
2768 spin_unlock(&osc_ast_guard);
2769 unlock_res_and_lock(lock);
2772 static void osc_set_data_with_check(struct lustre_handle *lockh,
2773 struct ldlm_enqueue_info *einfo,
2776 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2779 osc_set_lock_data_with_check(lock, einfo, flags);
2780 LDLM_LOCK_PUT(lock);
2782 CERROR("lockh %p, data %p - client evicted?\n",
2783 lockh, einfo->ei_cbdata);
2786 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2787 ldlm_iterator_t replace, void *data)
2789 struct ldlm_res_id res_id;
2790 struct obd_device *obd = class_exp2obd(exp);
2792 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2793 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2797 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2798 obd_enqueue_update_f upcall, void *cookie,
2801 int intent = *flags & LDLM_FL_HAS_INTENT;
2805 /* The request was created before ldlm_cli_enqueue call. */
2806 if (rc == ELDLM_LOCK_ABORTED) {
2807 struct ldlm_reply *rep;
2808 rep = req_capsule_server_get(&req->rq_pill,
2811 LASSERT(rep != NULL);
2812 if (rep->lock_policy_res1)
2813 rc = rep->lock_policy_res1;
2817 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2818 *flags |= LDLM_FL_LVB_READY;
2819 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2820 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2823 /* Call the update callback. */
2824 rc = (*upcall)(cookie, rc);
2828 static int osc_enqueue_interpret(const struct lu_env *env,
2829 struct ptlrpc_request *req,
2830 struct osc_enqueue_args *aa, int rc)
2832 struct ldlm_lock *lock;
2833 struct lustre_handle handle;
2836 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2837 * might be freed anytime after lock upcall has been called. */
2838 lustre_handle_copy(&handle, aa->oa_lockh);
2839 mode = aa->oa_ei->ei_mode;
2841 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2843 lock = ldlm_handle2lock(&handle);
2845 /* Take an additional reference so that a blocking AST that
2846 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2847 * to arrive after an upcall has been executed by
2848 * osc_enqueue_fini(). */
2849 ldlm_lock_addref(&handle, mode);
2851 /* Complete obtaining the lock procedure. */
2852 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2853 mode, aa->oa_flags, aa->oa_lvb,
2854 sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2856 /* Complete osc stuff. */
2857 rc = osc_enqueue_fini(req, aa->oa_lvb,
2858 aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2859 /* Release the lock for async request. */
2860 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2862 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2863 * not already released by
2864 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2866 ldlm_lock_decref(&handle, mode);
2868 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2869 aa->oa_lockh, req, aa);
2870 ldlm_lock_decref(&handle, mode);
2871 LDLM_LOCK_PUT(lock);
2875 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2876 struct lov_oinfo *loi, int flags,
2877 struct ost_lvb *lvb, __u32 mode, int rc)
2879 if (rc == ELDLM_OK) {
2880 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2883 LASSERT(lock != NULL);
2884 loi->loi_lvb = *lvb;
2885 tmp = loi->loi_lvb.lvb_size;
2886 /* Extend KMS up to the end of this lock and no further
2887 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2888 if (tmp > lock->l_policy_data.l_extent.end)
2889 tmp = lock->l_policy_data.l_extent.end + 1;
2890 if (tmp >= loi->loi_kms) {
2891 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2892 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2893 loi_kms_set(loi, tmp);
2895 LDLM_DEBUG(lock, "lock acquired, setting rss="
2896 LPU64"; leaving kms="LPU64", end="LPU64,
2897 loi->loi_lvb.lvb_size, loi->loi_kms,
2898 lock->l_policy_data.l_extent.end);
2900 ldlm_lock_allow_match(lock);
2901 LDLM_LOCK_PUT(lock);
2902 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2903 loi->loi_lvb = *lvb;
2904 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2905 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2909 EXPORT_SYMBOL(osc_update_enqueue);
2911 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2913 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2914 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2915 * other synchronous requests, however keeping some locks and trying to obtain
2916 * others may take a considerable amount of time in a case of ost failure; and
2917 * when other sync requests do not get released lock from a client, the client
2918 * is excluded from the cluster -- such scenarious make the life difficult, so
2919 * release locks just after they are obtained. */
2920 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2921 int *flags, ldlm_policy_data_t *policy,
2922 struct ost_lvb *lvb, int kms_valid,
2923 obd_enqueue_update_f upcall, void *cookie,
2924 struct ldlm_enqueue_info *einfo,
2925 struct lustre_handle *lockh,
2926 struct ptlrpc_request_set *rqset, int async)
2928 struct obd_device *obd = exp->exp_obd;
2929 struct ptlrpc_request *req = NULL;
2930 int intent = *flags & LDLM_FL_HAS_INTENT;
2935 /* Filesystem lock extents are extended to page boundaries so that
2936 * dealing with the page cache is a little smoother. */
2937 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2938 policy->l_extent.end |= ~CFS_PAGE_MASK;
2941 * kms is not valid when either object is completely fresh (so that no
2942 * locks are cached), or object was evicted. In the latter case cached
2943 * lock cannot be used, because it would prime inode state with
2944 * potentially stale LVB.
2949 /* Next, search for already existing extent locks that will cover us */
2950 /* If we're trying to read, we also search for an existing PW lock. The
2951 * VFS and page cache already protect us locally, so lots of readers/
2952 * writers can share a single PW lock.
2954 * There are problems with conversion deadlocks, so instead of
2955 * converting a read lock to a write lock, we'll just enqueue a new
2958 * At some point we should cancel the read lock instead of making them
2959 * send us a blocking callback, but there are problems with canceling
2960 * locks out from other users right now, too. */
2961 mode = einfo->ei_mode;
2962 if (einfo->ei_mode == LCK_PR)
2964 mode = ldlm_lock_match(obd->obd_namespace,
2965 *flags | LDLM_FL_LVB_READY, res_id,
2966 einfo->ei_type, policy, mode, lockh, 0);
2968 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2970 if (matched->l_ast_data == NULL ||
2971 matched->l_ast_data == einfo->ei_cbdata) {
2972 /* addref the lock only if not async requests and PW
2973 * lock is matched whereas we asked for PR. */
2974 if (!rqset && einfo->ei_mode != mode)
2975 ldlm_lock_addref(lockh, LCK_PR);
2976 osc_set_lock_data_with_check(matched, einfo, *flags);
2978 /* I would like to be able to ASSERT here that
2979 * rss <= kms, but I can't, for reasons which
2980 * are explained in lov_enqueue() */
2983 /* We already have a lock, and it's referenced */
2984 (*upcall)(cookie, ELDLM_OK);
2986 /* For async requests, decref the lock. */
2987 if (einfo->ei_mode != mode)
2988 ldlm_lock_decref(lockh, LCK_PW);
2990 ldlm_lock_decref(lockh, einfo->ei_mode);
2991 LDLM_LOCK_PUT(matched);
2994 ldlm_lock_decref(lockh, mode);
2995 LDLM_LOCK_PUT(matched);
3000 CFS_LIST_HEAD(cancels);
3001 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3002 &RQF_LDLM_ENQUEUE_LVB);
3006 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3010 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3012 ptlrpc_request_set_replen(req);
3015 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3016 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3018 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3019 sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3022 struct osc_enqueue_args *aa;
3023 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3024 aa = ptlrpc_req_async_args(req);
3027 aa->oa_flags = flags;
3028 aa->oa_upcall = upcall;
3029 aa->oa_cookie = cookie;
3031 aa->oa_lockh = lockh;
3033 req->rq_interpret_reply =
3034 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3035 if (rqset == PTLRPCD_SET)
3036 ptlrpcd_add_req(req, PSCOPE_OTHER);
3038 ptlrpc_set_add_req(rqset, req);
3039 } else if (intent) {
3040 ptlrpc_req_finished(req);
3045 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3047 ptlrpc_req_finished(req);
3052 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3053 struct ldlm_enqueue_info *einfo,
3054 struct ptlrpc_request_set *rqset)
3056 struct ldlm_res_id res_id;
3060 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3061 oinfo->oi_md->lsm_object_gr, &res_id);
3063 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3064 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3065 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3066 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3067 rqset, rqset != NULL);
3071 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3072 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3073 int *flags, void *data, struct lustre_handle *lockh,
3076 struct obd_device *obd = exp->exp_obd;
3077 int lflags = *flags;
3081 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3084 /* Filesystem lock extents are extended to page boundaries so that
3085 * dealing with the page cache is a little smoother */
3086 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3087 policy->l_extent.end |= ~CFS_PAGE_MASK;
3089 /* Next, search for already existing extent locks that will cover us */
3090 /* If we're trying to read, we also search for an existing PW lock. The
3091 * VFS and page cache already protect us locally, so lots of readers/
3092 * writers can share a single PW lock. */
3096 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3097 res_id, type, policy, rc, lockh, unref);
3100 osc_set_data_with_check(lockh, data, lflags);
3101 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3102 ldlm_lock_addref(lockh, LCK_PR);
3103 ldlm_lock_decref(lockh, LCK_PW);
3110 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3114 if (unlikely(mode == LCK_GROUP))
3115 ldlm_lock_decref_and_cancel(lockh, mode);
3117 ldlm_lock_decref(lockh, mode);
3122 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3123 __u32 mode, struct lustre_handle *lockh)
3126 RETURN(osc_cancel_base(lockh, mode));
3129 static int osc_cancel_unused(struct obd_export *exp,
3130 struct lov_stripe_md *lsm, int flags,
3133 struct obd_device *obd = class_exp2obd(exp);
3134 struct ldlm_res_id res_id, *resp = NULL;
3137 resp = osc_build_res_name(lsm->lsm_object_id,
3138 lsm->lsm_object_gr, &res_id);
3141 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3144 static int osc_statfs_interpret(const struct lu_env *env,
3145 struct ptlrpc_request *req,
3146 struct osc_async_args *aa, int rc)
3148 struct obd_statfs *msfs;
3154 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3156 GOTO(out, rc = -EPROTO);
3159 *aa->aa_oi->oi_osfs = *msfs;
3161 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3165 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3166 __u64 max_age, struct ptlrpc_request_set *rqset)
3168 struct ptlrpc_request *req;
3169 struct osc_async_args *aa;
3173 /* We could possibly pass max_age in the request (as an absolute
3174 * timestamp or a "seconds.usec ago") so the target can avoid doing
3175 * extra calls into the filesystem if that isn't necessary (e.g.
3176 * during mount that would help a bit). Having relative timestamps
3177 * is not so great if request processing is slow, while absolute
3178 * timestamps are not ideal because they need time synchronization. */
3179 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3183 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3185 ptlrpc_request_free(req);
3188 ptlrpc_request_set_replen(req);
3189 req->rq_request_portal = OST_CREATE_PORTAL;
3190 ptlrpc_at_set_req_timeout(req);
3192 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3193 /* procfs requests not want stat in wait for avoid deadlock */
3194 req->rq_no_resend = 1;
3195 req->rq_no_delay = 1;
3198 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3199 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3200 aa = ptlrpc_req_async_args(req);
3203 ptlrpc_set_add_req(rqset, req);
3207 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3208 __u64 max_age, __u32 flags)
3210 struct obd_statfs *msfs;
3211 struct ptlrpc_request *req;
3212 struct obd_import *imp = NULL;
3216 /*Since the request might also come from lprocfs, so we need
3217 *sync this with client_disconnect_export Bug15684*/
3218 down_read(&obd->u.cli.cl_sem);
3219 if (obd->u.cli.cl_import)
3220 imp = class_import_get(obd->u.cli.cl_import);
3221 up_read(&obd->u.cli.cl_sem);
3225 /* We could possibly pass max_age in the request (as an absolute
3226 * timestamp or a "seconds.usec ago") so the target can avoid doing
3227 * extra calls into the filesystem if that isn't necessary (e.g.
3228 * during mount that would help a bit). Having relative timestamps
3229 * is not so great if request processing is slow, while absolute
3230 * timestamps are not ideal because they need time synchronization. */
3231 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3233 class_import_put(imp);
3238 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3240 ptlrpc_request_free(req);
3243 ptlrpc_request_set_replen(req);
3244 req->rq_request_portal = OST_CREATE_PORTAL;
3245 ptlrpc_at_set_req_timeout(req);
3247 if (flags & OBD_STATFS_NODELAY) {
3248 /* procfs requests not want stat in wait for avoid deadlock */
3249 req->rq_no_resend = 1;
3250 req->rq_no_delay = 1;
3253 rc = ptlrpc_queue_wait(req);
3257 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3259 GOTO(out, rc = -EPROTO);
3266 ptlrpc_req_finished(req);
3270 /* Retrieve object striping information.
3272 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3273 * the maximum number of OST indices which will fit in the user buffer.
3274 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3276 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3278 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3279 struct lov_user_md_v3 lum, *lumk;
3280 struct lov_user_ost_data_v1 *lmm_objects;
3281 int rc = 0, lum_size;
3287 /* we only need the header part from user space to get lmm_magic and
3288 * lmm_stripe_count, (the header part is common to v1 and v3) */
3289 lum_size = sizeof(struct lov_user_md_v1);
3290 if (copy_from_user(&lum, lump, lum_size))
3293 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3294 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3297 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3298 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3299 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3300 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3302 /* we can use lov_mds_md_size() to compute lum_size
3303 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3304 if (lum.lmm_stripe_count > 0) {
3305 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3306 OBD_ALLOC(lumk, lum_size);
3310 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3311 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3313 lmm_objects = &(lumk->lmm_objects[0]);
3314 lmm_objects->l_object_id = lsm->lsm_object_id;
3316 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3320 lumk->lmm_object_id = lsm->lsm_object_id;
3321 lumk->lmm_object_gr = lsm->lsm_object_gr;
3322 lumk->lmm_stripe_count = 1;
3324 if (copy_to_user(lump, lumk, lum_size))
3328 OBD_FREE(lumk, lum_size);
3334 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3335 void *karg, void *uarg)
3337 struct obd_device *obd = exp->exp_obd;
3338 struct obd_ioctl_data *data = karg;
3342 if (!try_module_get(THIS_MODULE)) {
3343 CERROR("Can't get module. Is it alive?");
3347 case OBD_IOC_LOV_GET_CONFIG: {
3349 struct lov_desc *desc;
3350 struct obd_uuid uuid;
3354 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3355 GOTO(out, err = -EINVAL);
3357 data = (struct obd_ioctl_data *)buf;
3359 if (sizeof(*desc) > data->ioc_inllen1) {
3360 obd_ioctl_freedata(buf, len);
3361 GOTO(out, err = -EINVAL);
3364 if (data->ioc_inllen2 < sizeof(uuid)) {
3365 obd_ioctl_freedata(buf, len);
3366 GOTO(out, err = -EINVAL);
3369 desc = (struct lov_desc *)data->ioc_inlbuf1;
3370 desc->ld_tgt_count = 1;
3371 desc->ld_active_tgt_count = 1;
3372 desc->ld_default_stripe_count = 1;
3373 desc->ld_default_stripe_size = 0;
3374 desc->ld_default_stripe_offset = 0;
3375 desc->ld_pattern = 0;
3376 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3378 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3380 err = copy_to_user((void *)uarg, buf, len);
3383 obd_ioctl_freedata(buf, len);
3386 case LL_IOC_LOV_SETSTRIPE:
3387 err = obd_alloc_memmd(exp, karg);
3391 case LL_IOC_LOV_GETSTRIPE:
3392 err = osc_getstripe(karg, uarg);
3394 case OBD_IOC_CLIENT_RECOVER:
3395 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3400 case IOC_OSC_SET_ACTIVE:
3401 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3404 case OBD_IOC_POLL_QUOTACHECK:
3405 err = lquota_poll_check(quota_interface, exp,
3406 (struct if_quotacheck *)karg);
3409 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3410 cmd, cfs_curproc_comm());
3411 GOTO(out, err = -ENOTTY);
3414 module_put(THIS_MODULE);
3418 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3419 void *key, __u32 *vallen, void *val,
3420 struct lov_stripe_md *lsm)
3423 if (!vallen || !val)
3426 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3427 __u32 *stripe = val;
3428 *vallen = sizeof(*stripe);
3431 } else if (KEY_IS(KEY_LAST_ID)) {
3432 struct ptlrpc_request *req;
3437 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3438 &RQF_OST_GET_INFO_LAST_ID);
3442 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3443 RCL_CLIENT, keylen);
3444 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3446 ptlrpc_request_free(req);
3450 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3451 memcpy(tmp, key, keylen);
3453 ptlrpc_request_set_replen(req);
3454 rc = ptlrpc_queue_wait(req);
3458 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3460 GOTO(out, rc = -EPROTO);
3462 *((obd_id *)val) = *reply;
3464 ptlrpc_req_finished(req);
3466 } else if (KEY_IS(KEY_FIEMAP)) {
3467 struct ptlrpc_request *req;
3468 struct ll_user_fiemap *reply;
3472 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3473 &RQF_OST_GET_INFO_FIEMAP);
3477 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3478 RCL_CLIENT, keylen);
3479 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3480 RCL_CLIENT, *vallen);
3481 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3482 RCL_SERVER, *vallen);
3484 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3486 ptlrpc_request_free(req);
3490 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3491 memcpy(tmp, key, keylen);
3492 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3493 memcpy(tmp, val, *vallen);
3495 ptlrpc_request_set_replen(req);
3496 rc = ptlrpc_queue_wait(req);
3500 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3502 GOTO(out1, rc = -EPROTO);
3504 memcpy(val, reply, *vallen);
3506 ptlrpc_req_finished(req);
3514 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3515 struct ptlrpc_request *req,
3518 struct llog_ctxt *ctxt;
3519 struct obd_import *imp = req->rq_import;
3525 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3528 rc = llog_initiator_connect(ctxt);
3530 CERROR("cannot establish connection for "
3531 "ctxt %p: %d\n", ctxt, rc);
3534 llog_ctxt_put(ctxt);
3535 spin_lock(&imp->imp_lock);
3536 imp->imp_server_timeout = 1;
3537 imp->imp_pingable = 1;
3538 spin_unlock(&imp->imp_lock);
3539 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3544 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3545 void *key, obd_count vallen, void *val,
3546 struct ptlrpc_request_set *set)
3548 struct ptlrpc_request *req;
3549 struct obd_device *obd = exp->exp_obd;
3550 struct obd_import *imp = class_exp2cliimp(exp);
3555 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3557 if (KEY_IS(KEY_NEXT_ID)) {
3558 if (vallen != sizeof(obd_id))
3562 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3563 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3564 exp->exp_obd->obd_name,
3565 obd->u.cli.cl_oscc.oscc_next_id);
3570 if (KEY_IS(KEY_UNLINKED)) {
3571 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3572 spin_lock(&oscc->oscc_lock);
3573 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3574 spin_unlock(&oscc->oscc_lock);
3578 if (KEY_IS(KEY_INIT_RECOV)) {
3579 if (vallen != sizeof(int))
3581 spin_lock(&imp->imp_lock);
3582 imp->imp_initial_recov = *(int *)val;
3583 spin_unlock(&imp->imp_lock);
3584 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3585 exp->exp_obd->obd_name,
3586 imp->imp_initial_recov);
3590 if (KEY_IS(KEY_CHECKSUM)) {
3591 if (vallen != sizeof(int))
3593 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3597 if (KEY_IS(KEY_FLUSH_CTX)) {
3598 sptlrpc_import_flush_my_ctx(imp);
3605 /* We pass all other commands directly to OST. Since nobody calls osc
3606 methods directly and everybody is supposed to go through LOV, we
3607 assume lov checked invalid values for us.
3608 The only recognised values so far are evict_by_nid and mds_conn.
3609 Even if something bad goes through, we'd get a -EINVAL from OST
3613 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3617 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3618 RCL_CLIENT, keylen);
3619 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3620 RCL_CLIENT, vallen);
3621 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3623 ptlrpc_request_free(req);
3627 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3628 memcpy(tmp, key, keylen);
3629 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3630 memcpy(tmp, val, vallen);
3632 if (KEY_IS(KEY_MDS_CONN)) {
3633 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3635 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3636 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3637 LASSERT(oscc->oscc_oa.o_gr > 0);
3638 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3641 ptlrpc_request_set_replen(req);
3642 ptlrpc_set_add_req(set, req);
3643 ptlrpc_check_set(NULL, set);
3649 static struct llog_operations osc_size_repl_logops = {
3650 lop_cancel: llog_obd_repl_cancel
3653 static struct llog_operations osc_mds_ost_orig_logops;
3654 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3655 struct obd_device *tgt, int count,
3656 struct llog_catid *catid, struct obd_uuid *uuid)
3661 LASSERT(olg == &obd->obd_olg);
3662 spin_lock(&obd->obd_dev_lock);
3663 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3664 osc_mds_ost_orig_logops = llog_lvfs_ops;
3665 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3666 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3667 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3668 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3670 spin_unlock(&obd->obd_dev_lock);
3672 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3673 &catid->lci_logid, &osc_mds_ost_orig_logops);
3675 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3679 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3680 NULL, &osc_size_repl_logops);
3682 struct llog_ctxt *ctxt =
3683 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3686 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3691 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3692 obd->obd_name, tgt->obd_name, count, catid, rc);
3693 CERROR("logid "LPX64":0x%x\n",
3694 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3699 static int osc_llog_finish(struct obd_device *obd, int count)
3701 struct llog_ctxt *ctxt;
3702 int rc = 0, rc2 = 0;
3705 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3707 rc = llog_cleanup(ctxt);
3709 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3711 rc2 = llog_cleanup(ctxt);
3718 static int osc_reconnect(const struct lu_env *env,
3719 struct obd_export *exp, struct obd_device *obd,
3720 struct obd_uuid *cluuid,
3721 struct obd_connect_data *data,
3724 struct client_obd *cli = &obd->u.cli;
3726 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3729 client_obd_list_lock(&cli->cl_loi_list_lock);
3730 data->ocd_grant = cli->cl_avail_grant ?:
3731 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3732 lost_grant = cli->cl_lost_grant;
3733 cli->cl_lost_grant = 0;
3734 client_obd_list_unlock(&cli->cl_loi_list_lock);
3736 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3737 "cl_lost_grant: %ld\n", data->ocd_grant,
3738 cli->cl_avail_grant, lost_grant);
3739 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3740 " ocd_grant: %d\n", data->ocd_connect_flags,
3741 data->ocd_version, data->ocd_grant);
3747 static int osc_disconnect(struct obd_export *exp)
3749 struct obd_device *obd = class_exp2obd(exp);
3750 struct llog_ctxt *ctxt;
3753 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3755 if (obd->u.cli.cl_conn_count == 1) {
3756 /* Flush any remaining cancel messages out to the
3758 llog_sync(ctxt, exp);
3760 llog_ctxt_put(ctxt);
3762 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3766 rc = client_disconnect_export(exp);
3770 static int osc_import_event(struct obd_device *obd,
3771 struct obd_import *imp,
3772 enum obd_import_event event)
3774 struct client_obd *cli;
3778 LASSERT(imp->imp_obd == obd);
3781 case IMP_EVENT_DISCON: {
3782 /* Only do this on the MDS OSC's */
3783 if (imp->imp_server_timeout) {
3784 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3786 spin_lock(&oscc->oscc_lock);
3787 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3788 spin_unlock(&oscc->oscc_lock);
3791 client_obd_list_lock(&cli->cl_loi_list_lock);
3792 cli->cl_avail_grant = 0;
3793 cli->cl_lost_grant = 0;
3794 client_obd_list_unlock(&cli->cl_loi_list_lock);
3797 case IMP_EVENT_INACTIVE: {
3798 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3801 case IMP_EVENT_INVALIDATE: {
3802 struct ldlm_namespace *ns = obd->obd_namespace;
3806 env = cl_env_get(&refcheck);
3810 client_obd_list_lock(&cli->cl_loi_list_lock);
3811 /* all pages go to failing rpcs due to the invalid
3813 osc_check_rpcs(env, cli);
3814 client_obd_list_unlock(&cli->cl_loi_list_lock);
3816 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3817 cl_env_put(env, &refcheck);
3822 case IMP_EVENT_ACTIVE: {
3823 /* Only do this on the MDS OSC's */
3824 if (imp->imp_server_timeout) {
3825 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3827 spin_lock(&oscc->oscc_lock);
3828 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3829 spin_unlock(&oscc->oscc_lock);
3831 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3834 case IMP_EVENT_OCD: {
3835 struct obd_connect_data *ocd = &imp->imp_connect_data;
3837 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3838 osc_init_grant(&obd->u.cli, ocd);
3841 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3842 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3844 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3848 CERROR("Unknown import event %d\n", event);
3854 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3860 rc = ptlrpcd_addref();
3864 rc = client_obd_setup(obd, lcfg);
3868 struct lprocfs_static_vars lvars = { 0 };
3869 struct client_obd *cli = &obd->u.cli;
3871 lprocfs_osc_init_vars(&lvars);
3872 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3873 lproc_osc_attach_seqstat(obd);
3874 sptlrpc_lprocfs_cliobd_attach(obd);
3875 ptlrpc_lprocfs_register_obd(obd);
3879 /* We need to allocate a few requests more, because
3880 brw_interpret tries to create new requests before freeing
3881 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3882 reserved, but I afraid that might be too much wasted RAM
3883 in fact, so 2 is just my guess and still should work. */
3884 cli->cl_import->imp_rq_pool =
3885 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3887 ptlrpc_add_rqs_to_pool);
3893 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3899 case OBD_CLEANUP_EARLY: {
3900 struct obd_import *imp;
3901 imp = obd->u.cli.cl_import;
3902 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3903 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3904 ptlrpc_deactivate_import(imp);
3905 spin_lock(&imp->imp_lock);
3906 imp->imp_pingable = 0;
3907 spin_unlock(&imp->imp_lock);
3910 case OBD_CLEANUP_EXPORTS: {
3911 /* If we set up but never connected, the
3912 client import will not have been cleaned. */
3913 if (obd->u.cli.cl_import) {
3914 struct obd_import *imp;
3915 down_write(&obd->u.cli.cl_sem);
3916 imp = obd->u.cli.cl_import;
3917 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3919 ptlrpc_invalidate_import(imp);
3920 if (imp->imp_rq_pool) {
3921 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3922 imp->imp_rq_pool = NULL;
3924 class_destroy_import(imp);
3925 up_write(&obd->u.cli.cl_sem);
3926 obd->u.cli.cl_import = NULL;
3928 rc = obd_llog_finish(obd, 0);
3930 CERROR("failed to cleanup llogging subsystems\n");
3937 int osc_cleanup(struct obd_device *obd)
3939 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3943 ptlrpc_lprocfs_unregister_obd(obd);
3944 lprocfs_obd_cleanup(obd);
3946 spin_lock(&oscc->oscc_lock);
3947 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3948 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3949 spin_unlock(&oscc->oscc_lock);
3951 /* free memory of osc quota cache */
3952 lquota_cleanup(quota_interface, obd);
3954 rc = client_obd_cleanup(obd);
3960 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3962 struct lprocfs_static_vars lvars = { 0 };
3965 lprocfs_osc_init_vars(&lvars);
3967 switch (lcfg->lcfg_command) {
3968 case LCFG_SPTLRPC_CONF:
3969 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3972 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3982 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3984 return osc_process_config_base(obd, buf);
3987 struct obd_ops osc_obd_ops = {
3988 .o_owner = THIS_MODULE,
3989 .o_setup = osc_setup,
3990 .o_precleanup = osc_precleanup,
3991 .o_cleanup = osc_cleanup,
3992 .o_add_conn = client_import_add_conn,
3993 .o_del_conn = client_import_del_conn,
3994 .o_connect = client_connect_import,
3995 .o_reconnect = osc_reconnect,
3996 .o_disconnect = osc_disconnect,
3997 .o_statfs = osc_statfs,
3998 .o_statfs_async = osc_statfs_async,
3999 .o_packmd = osc_packmd,
4000 .o_unpackmd = osc_unpackmd,
4001 .o_precreate = osc_precreate,
4002 .o_create = osc_create,
4003 .o_destroy = osc_destroy,
4004 .o_getattr = osc_getattr,
4005 .o_getattr_async = osc_getattr_async,
4006 .o_setattr = osc_setattr,
4007 .o_setattr_async = osc_setattr_async,
4009 .o_punch = osc_punch,
4011 .o_enqueue = osc_enqueue,
4012 .o_change_cbdata = osc_change_cbdata,
4013 .o_cancel = osc_cancel,
4014 .o_cancel_unused = osc_cancel_unused,
4015 .o_iocontrol = osc_iocontrol,
4016 .o_get_info = osc_get_info,
4017 .o_set_info_async = osc_set_info_async,
4018 .o_import_event = osc_import_event,
4019 .o_llog_init = osc_llog_init,
4020 .o_llog_finish = osc_llog_finish,
4021 .o_process_config = osc_process_config,
4024 extern struct lu_kmem_descr osc_caches[];
4025 extern spinlock_t osc_ast_guard;
4026 extern struct lock_class_key osc_ast_guard_class;
4028 int __init osc_init(void)
4030 struct lprocfs_static_vars lvars = { 0 };
4034 /* print an address of _any_ initialized kernel symbol from this
4035 * module, to allow debugging with gdb that doesn't support data
4036 * symbols from modules.*/
4037 CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4039 rc = lu_kmem_init(osc_caches);
4041 lprocfs_osc_init_vars(&lvars);
4043 request_module("lquota");
4044 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4045 lquota_init(quota_interface);
4046 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4048 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4049 LUSTRE_OSC_NAME, &osc_device_type);
4051 if (quota_interface)
4052 PORTAL_SYMBOL_PUT(osc_quota_interface);
4053 lu_kmem_fini(osc_caches);
4057 spin_lock_init(&osc_ast_guard);
4058 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4064 static void /*__exit*/ osc_exit(void)
4066 lu_device_type_fini(&osc_device_type);
4068 lquota_exit(quota_interface);
4069 if (quota_interface)
4070 PORTAL_SYMBOL_PUT(osc_quota_interface);
4072 class_unregister_type(LUSTRE_OSC_NAME);
4073 lu_kmem_fini(osc_caches);
4076 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4077 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4078 MODULE_LICENSE("GPL");
4080 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);