1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
78 atomic_t osc_resend_time;
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 LASSERT((*lsmp)->lsm_object_id);
161 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
169 struct ost_body *body;
170 struct osc_async_args *aa = data;
176 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177 lustre_swab_ost_body);
179 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
182 /* This should really be sent by the OST */
183 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
186 CERROR("can't unpack ost_body\n");
188 aa->aa_oi->oi_oa->o_valid = 0;
191 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196 struct ptlrpc_request_set *set)
198 struct ptlrpc_request *req;
199 struct ost_body *body;
200 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201 struct osc_async_args *aa;
204 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205 OST_GETATTR, 2, size,NULL);
209 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
212 ptlrpc_req_set_repsize(req, 2, size);
213 req->rq_interpret_reply = osc_getattr_interpret;
215 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216 aa = ptlrpc_req_async_args(req);
219 ptlrpc_set_add_req(set, req);
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
225 struct ptlrpc_request *req;
226 struct ost_body *body;
227 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
231 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232 OST_GETATTR, 2, size, NULL);
236 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
239 ptlrpc_req_set_repsize(req, 2, size);
241 rc = ptlrpc_queue_wait(req);
243 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
247 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248 lustre_swab_ost_body);
250 CERROR ("can't unpack ost_body\n");
251 GOTO (out, rc = -EPROTO);
254 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
257 /* This should really be sent by the OST */
258 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
263 ptlrpc_req_finished(req);
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268 struct obd_trans_info *oti)
270 struct ptlrpc_request *req;
271 struct ost_body *body;
272 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
276 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277 OST_SETATTR, 2, size, NULL);
281 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
284 ptlrpc_req_set_repsize(req, 2, size);
286 rc = ptlrpc_queue_wait(req);
290 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291 lustre_swab_ost_body);
293 GOTO(out, rc = -EPROTO);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
299 ptlrpc_req_finished(req);
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
306 struct ost_body *body;
307 struct osc_async_args *aa = data;
313 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314 lustre_swab_ost_body);
316 CERROR("can't unpack ost_body\n");
317 GOTO(out, rc = -EPROTO);
320 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
322 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327 struct obd_trans_info *oti,
328 struct ptlrpc_request_set *rqset)
330 struct ptlrpc_request *req;
331 struct ost_body *body;
332 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
334 struct osc_async_args *aa;
337 if (osc_exp_is_2_0_server(exp)) {
341 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342 OST_SETATTR, bufcount, size, NULL);
346 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
348 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
350 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
353 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354 ptlrpc_req_set_repsize(req, 2, size);
355 /* do mds to ost setattr asynchronouly */
357 /* Do not wait for response. */
358 ptlrpcd_add_req(req);
360 req->rq_interpret_reply = osc_setattr_interpret;
362 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363 aa = ptlrpc_req_async_args(req);
366 ptlrpc_set_add_req(rqset, req);
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373 struct lov_stripe_md **ea, struct obd_trans_info *oti)
375 struct ptlrpc_request *req;
376 struct ost_body *body;
377 struct lov_stripe_md *lsm;
378 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
387 rc = obd_alloc_memmd(exp, &lsm);
392 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393 OST_CREATE, 2, size, NULL);
395 GOTO(out, rc = -ENOMEM);
397 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398 lustre_set_wire_obdo(&body->oa, oa);
400 ptlrpc_req_set_repsize(req, 2, size);
401 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402 oa->o_flags == OBD_FL_DELORPHAN) {
404 "delorphan from OST integration");
405 /* Don't resend the delorphan req */
406 req->rq_no_resend = req->rq_no_delay = 1;
409 rc = ptlrpc_queue_wait(req);
413 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414 lustre_swab_ost_body);
416 CERROR ("can't unpack ost_body\n");
417 GOTO (out_req, rc = -EPROTO);
420 lustre_get_wire_obdo(oa, &body->oa);
422 /* This should really be sent by the OST */
423 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424 oa->o_valid |= OBD_MD_FLBLKSZ;
426 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427 * have valid lsm_oinfo data structs, so don't go touching that.
428 * This needs to be fixed in a big way.
430 lsm->lsm_object_id = oa->o_id;
434 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
436 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437 if (!oti->oti_logcookies)
438 oti_alloc_cookies(oti, 1);
439 *oti->oti_logcookies = oa->o_lcookie;
443 CDEBUG(D_HA, "transno: "LPD64"\n",
444 lustre_msg_get_transno(req->rq_repmsg));
446 ptlrpc_req_finished(req);
449 obd_free_memmd(exp, &lsm);
453 static int osc_punch_interpret(struct ptlrpc_request *req,
456 struct ost_body *body;
457 struct osc_async_args *aa = data;
463 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464 lustre_swab_ost_body);
466 CERROR ("can't unpack ost_body\n");
467 GOTO(out, rc = -EPROTO);
470 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
472 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477 struct obd_trans_info *oti,
478 struct ptlrpc_request_set *rqset)
480 struct ptlrpc_request *req;
481 struct osc_async_args *aa;
482 struct ost_body *body;
483 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492 OST_PUNCH, 2, size, NULL);
496 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
497 ptlrpc_at_set_req_timeout(req);
499 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
502 /* overload the size and blocks fields in the oa with start/end */
503 body->oa.o_size = oinfo->oi_policy.l_extent.start;
504 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
507 ptlrpc_req_set_repsize(req, 2, size);
509 req->rq_interpret_reply = osc_punch_interpret;
510 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511 aa = ptlrpc_req_async_args(req);
513 ptlrpc_set_add_req(rqset, req);
518 static int osc_sync_interpret(struct ptlrpc_request *req,
521 struct ost_body *body;
522 struct osc_async_args *aa = data;
528 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529 lustre_swab_ost_body);
531 CERROR ("can't unpack ost_body\n");
532 GOTO(out, rc = -EPROTO);
535 *aa->aa_oi->oi_oa = body->oa;
537 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542 obd_size start, obd_size end,
543 struct ptlrpc_request_set *set)
545 struct ptlrpc_request *req;
546 struct ost_body *body;
547 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548 struct osc_async_args *aa;
556 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557 OST_SYNC, 2, size, NULL);
561 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
564 /* overload the size and blocks fields in the oa with start/end */
565 body->oa.o_size = start;
566 body->oa.o_blocks = end;
567 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
569 ptlrpc_req_set_repsize(req, 2, size);
570 req->rq_interpret_reply = osc_sync_interpret;
572 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573 aa = ptlrpc_req_async_args(req);
576 ptlrpc_set_add_req(set, req);
580 /* Find and cancel locally locks matched by @mode in the resource found by
581 * @objid. Found locks are added into @cancel list. Returns the amount of
582 * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584 struct list_head *cancels, ldlm_mode_t mode,
587 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588 struct ldlm_res_id res_id;
589 struct ldlm_resource *res;
593 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
598 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599 lock_flags, 0, NULL);
600 ldlm_resource_putref(res);
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
607 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
609 atomic_dec(&cli->cl_destroy_in_flight);
610 cfs_waitq_signal(&cli->cl_destroy_waitq);
614 static int osc_can_send_destroy(struct client_obd *cli)
616 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617 cli->cl_max_rpcs_in_flight) {
618 /* The destroy request can be sent */
621 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622 cli->cl_max_rpcs_in_flight) {
624 * The counter has been modified between the two atomic
627 cfs_waitq_signal(&cli->cl_destroy_waitq);
632 /* Destroy requests can be async always on the client, and we don't even really
633 * care about the return code since the client cannot do anything at all about
635 * When the MDS is unlinking a filename, it saves the file objects into a
636 * recovery llog, and these object records are cancelled when the OST reports
637 * they were destroyed and sync'd to disk (i.e. transaction committed).
638 * If the client dies, or the OST is down when the object should be destroyed,
639 * the records are not cancelled, and when the OST reconnects to the MDS next,
640 * it will retrieve the llog unlink logs and then sends the log cancellation
641 * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643 struct lov_stripe_md *ea, struct obd_trans_info *oti,
644 struct obd_export *md_export)
646 CFS_LIST_HEAD(cancels);
647 struct ptlrpc_request *req;
648 struct ost_body *body;
649 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650 sizeof(struct ldlm_request) };
651 int count, bufcount = 2;
652 struct client_obd *cli = &exp->exp_obd->u.cli;
660 LASSERT(oa->o_id != 0);
662 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663 LDLM_FL_DISCARD_DATA);
664 if (exp_connect_cancelset(exp))
666 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667 size, REQ_REC_OFF + 1, 0, &cancels, count);
671 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
672 ptlrpc_at_set_req_timeout(req);
674 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
676 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677 oa->o_lcookie = *oti->oti_logcookies;
680 lustre_set_wire_obdo(&body->oa, oa);
681 ptlrpc_req_set_repsize(req, 2, size);
683 /* don't throttle destroy RPCs for the MDT */
684 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685 req->rq_interpret_reply = osc_destroy_interpret;
686 if (!osc_can_send_destroy(cli)) {
687 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
691 * Wait until the number of on-going destroy RPCs drops
692 * under max_rpc_in_flight
694 l_wait_event_exclusive(cli->cl_destroy_waitq,
695 osc_can_send_destroy(cli), &lwi);
699 /* Do not wait for response */
700 ptlrpcd_add_req(req);
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
707 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
709 LASSERT(!(oa->o_valid & bits));
712 client_obd_list_lock(&cli->cl_loi_list_lock);
713 oa->o_dirty = cli->cl_dirty;
714 if (cli->cl_dirty > cli->cl_dirty_max) {
715 CERROR("dirty %lu > dirty_max %lu\n",
716 cli->cl_dirty, cli->cl_dirty_max);
718 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719 /* The atomic_read() allowing the atomic_inc() are not covered
720 * by a lock thus they may safely race and trip this CERROR()
721 * unless we add in a small fudge factor (+1). */
722 CERROR("dirty %d > system dirty_max %d\n",
723 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
725 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726 CERROR("dirty %lu - dirty_max %lu too big???\n",
727 cli->cl_dirty, cli->cl_dirty_max);
730 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731 (cli->cl_max_rpcs_in_flight + 1);
732 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
734 oa->o_grant = cli->cl_avail_grant;
735 oa->o_dropped = cli->cl_lost_grant;
736 cli->cl_lost_grant = 0;
737 client_obd_list_unlock(&cli->cl_loi_list_lock);
738 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
743 static void osc_update_next_shrink(struct client_obd *cli)
745 cli->cl_next_shrink_grant =
746 cfs_time_shift(cli->cl_grant_shrink_interval);
747 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748 cli->cl_next_shrink_grant);
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
754 atomic_inc(&obd_dirty_pages);
755 cli->cl_dirty += CFS_PAGE_SIZE;
756 cli->cl_avail_grant -= CFS_PAGE_SIZE;
757 pga->flag |= OBD_BRW_FROM_GRANT;
758 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759 CFS_PAGE_SIZE, pga, pga->pg);
760 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761 cli->cl_avail_grant);
762 osc_update_next_shrink(cli);
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766 * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768 struct brw_page *pga, int sent)
770 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
773 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
778 pga->flag &= ~OBD_BRW_FROM_GRANT;
779 atomic_dec(&obd_dirty_pages);
780 cli->cl_dirty -= CFS_PAGE_SIZE;
782 cli->cl_lost_grant += CFS_PAGE_SIZE;
783 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786 /* For short writes we shouldn't count parts of pages that
787 * span a whole block on the OST side, or our accounting goes
788 * wrong. Should match the code in filter_grant_check. */
789 int offset = pga->off & ~CFS_PAGE_MASK;
790 int count = pga->count + (offset & (blocksize - 1));
791 int end = (offset + pga->count) & (blocksize - 1);
793 count += blocksize - end;
795 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798 cli->cl_avail_grant, cli->cl_dirty);
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
806 return cli->cl_r_in_flight + cli->cl_w_in_flight;
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
812 struct list_head *l, *tmp;
813 struct osc_cache_waiter *ocw;
816 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817 /* if we can't dirty more, we must wait until some is written */
818 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821 "osc max %ld, sys max %d\n", cli->cl_dirty,
822 cli->cl_dirty_max, obd_max_dirty_pages);
826 /* if still dirty cache but no grant wait for pending RPCs that
827 * may yet return us some grant before doing sync writes */
828 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830 cli->cl_w_in_flight);
834 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835 list_del_init(&ocw->ocw_entry);
836 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837 /* no more RPCs in flight to return grant, do sync IO */
838 ocw->ocw_rc = -EDQUOT;
839 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
841 osc_consume_write_grant(cli,
842 &ocw->ocw_oap->oap_brw_page);
845 cfs_waitq_signal(&ocw->ocw_waitq);
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853 client_obd_list_lock(&cli->cl_loi_list_lock);
854 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855 if (body->oa.o_valid & OBD_MD_FLGRANT)
856 cli->cl_avail_grant += body->oa.o_grant;
857 /* waiters are woken in brw_interpret */
858 client_obd_list_unlock(&cli->cl_loi_list_lock);
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862 void *key, obd_count vallen, void *val,
863 struct ptlrpc_request_set *set);
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
868 struct osc_grant_args *aa = data;
869 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870 struct obdo *oa = aa->aa_oa;
871 struct ost_body *body;
874 client_obd_list_lock(&cli->cl_loi_list_lock);
875 cli->cl_avail_grant += oa->o_grant;
876 client_obd_list_unlock(&cli->cl_loi_list_lock);
879 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880 lustre_swab_ost_body);
881 osc_update_grant(cli, body);
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
889 client_obd_list_lock(&cli->cl_loi_list_lock);
890 oa->o_grant = cli->cl_avail_grant / 4;
891 cli->cl_avail_grant -= oa->o_grant;
892 client_obd_list_unlock(&cli->cl_loi_list_lock);
893 oa->o_flags |= OBD_FL_SHRINK_GRANT;
894 osc_update_next_shrink(cli);
897 /* Shrink the current grant, either from some large amount to enough for a
898 * full set of in-flight RPCs, or if we have already shrunk to that limit
899 * then to enough for a single RPC. This avoids keeping more grant than
900 * needed, and avoids shrinking the grant piecemeal. */
901 static int osc_shrink_grant(struct client_obd *cli)
903 long target = (cli->cl_max_rpcs_in_flight + 1) *
904 cli->cl_max_pages_per_rpc;
906 client_obd_list_lock(&cli->cl_loi_list_lock);
907 if (cli->cl_avail_grant <= target)
908 target = cli->cl_max_pages_per_rpc;
909 client_obd_list_unlock(&cli->cl_loi_list_lock);
911 return osc_shrink_grant_to_target(cli, target);
914 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
917 struct ost_body *body;
920 client_obd_list_lock(&cli->cl_loi_list_lock);
921 /* Don't shrink if we are already above or below the desired limit
922 * We don't want to shrink below a single RPC, as that will negatively
923 * impact block allocation and long-term performance. */
924 if (target < cli->cl_max_pages_per_rpc)
925 target = cli->cl_max_pages_per_rpc;
927 if (target >= cli->cl_avail_grant) {
928 client_obd_list_unlock(&cli->cl_loi_list_lock);
931 client_obd_list_unlock(&cli->cl_loi_list_lock);
937 osc_announce_cached(cli, &body->oa, 0);
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 body->oa.o_grant = cli->cl_avail_grant - target;
941 cli->cl_avail_grant = target;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
943 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
944 osc_update_next_shrink(cli);
946 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
947 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
948 sizeof(*body), body, NULL);
950 client_obd_list_lock(&cli->cl_loi_list_lock);
951 cli->cl_avail_grant += body->oa.o_grant;
952 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
959 static int osc_should_shrink_grant(struct client_obd *client)
961 cfs_time_t time = cfs_time_current();
962 cfs_time_t next_shrink = client->cl_next_shrink_grant;
963 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
964 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
965 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
968 osc_update_next_shrink(client);
973 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
975 struct client_obd *client;
977 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
978 if (osc_should_shrink_grant(client))
979 osc_shrink_grant(client);
984 static int osc_add_shrink_grant(struct client_obd *client)
988 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
990 osc_grant_shrink_grant_cb, NULL,
991 &client->cl_grant_shrink_list);
993 CERROR("add grant client %s error %d\n",
994 client->cl_import->imp_obd->obd_name, rc);
997 CDEBUG(D_CACHE, "add grant client %s \n",
998 client->cl_import->imp_obd->obd_name);
999 osc_update_next_shrink(client);
1003 static int osc_del_shrink_grant(struct client_obd *client)
1005 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1009 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1012 * ocd_grant is the total grant amount we're expect to hold: if we'v
1013 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1014 * to 0 as inflight rpcs fail out; otherwise, it's avail_grant + dirty.
1016 * race is tolerable here: if we're evicted, but imp_state already
1017 * left EVICTED state, then cl_diry must be 0 already.
1019 client_obd_list_lock(&cli->cl_loi_list_lock);
1020 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1021 cli->cl_avail_grant = ocd->ocd_grant;
1023 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1024 client_obd_list_unlock(&cli->cl_loi_list_lock);
1026 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1027 cli->cl_avail_grant, cli->cl_lost_grant);
1028 LASSERT(cli->cl_avail_grant >= 0);
1030 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1031 list_empty(&cli->cl_grant_shrink_list))
1032 osc_add_shrink_grant(cli);
1035 /* We assume that the reason this OSC got a short read is because it read
1036 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1037 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1038 * this stripe never got written at or beyond this stripe offset yet. */
1039 static void handle_short_read(int nob_read, obd_count page_count,
1040 struct brw_page **pga, int pshift)
1045 /* skip bytes read OK */
1046 while (nob_read > 0) {
1047 LASSERT (page_count > 0);
1049 if (pga[i]->count > nob_read) {
1050 /* EOF inside this page */
1051 ptr = cfs_kmap(pga[i]->pg) +
1052 (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1053 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1054 cfs_kunmap(pga[i]->pg);
1060 nob_read -= pga[i]->count;
1065 /* zero remaining pages */
1066 while (page_count-- > 0) {
1067 ptr = cfs_kmap(pga[i]->pg) +
1068 (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1069 memset(ptr, 0, pga[i]->count);
1070 cfs_kunmap(pga[i]->pg);
1075 static int check_write_rcs(struct ptlrpc_request *req,
1076 int requested_nob, int niocount,
1077 obd_count page_count, struct brw_page **pga)
1081 /* return error if any niobuf was in error */
1082 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1083 sizeof(*remote_rcs) * niocount, NULL);
1084 if (remote_rcs == NULL) {
1085 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1088 if (lustre_rep_need_swab(req))
1089 for (i = 0; i < niocount; i++)
1090 __swab32s(&remote_rcs[i]);
1092 for (i = 0; i < niocount; i++) {
1093 if (remote_rcs[i] < 0)
1094 return(remote_rcs[i]);
1096 if (remote_rcs[i] != 0) {
1097 CERROR("rc[%d] invalid (%d) req %p\n",
1098 i, remote_rcs[i], req);
1103 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1104 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1105 req->rq_bulk->bd_nob_transferred, requested_nob);
1112 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1114 if (p1->flag != p2->flag) {
1115 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1117 /* warn if we try to combine flags that we don't know to be
1118 * safe to combine */
1119 if ((p1->flag & mask) != (p2->flag & mask))
1120 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1121 "same brw?\n", p1->flag, p2->flag);
1125 return (p1->off + p1->count == p2->off);
1128 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1129 struct brw_page **pga, int opc,
1130 cksum_type_t cksum_type, int pshift)
1135 LASSERT (pg_count > 0);
1136 cksum = init_checksum(cksum_type);
1137 while (nob > 0 && pg_count > 0) {
1138 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1139 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1140 int count = pga[i]->count > nob ? nob : pga[i]->count;
1142 /* corrupt the data before we compute the checksum, to
1143 * simulate an OST->client data error */
1144 if (i == 0 && opc == OST_READ &&
1145 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1146 memcpy(ptr + off, "bad1", min(4, nob));
1147 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1148 cfs_kunmap(pga[i]->pg);
1149 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1152 nob -= pga[i]->count;
1156 /* For sending we only compute the wrong checksum instead
1157 * of corrupting the data so it is still correct on a redo */
1158 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1164 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1165 struct lov_stripe_md *lsm, obd_count page_count,
1166 struct brw_page **pga,
1167 struct ptlrpc_request **reqp, int pshift)
1169 struct ptlrpc_request *req;
1170 struct ptlrpc_bulk_desc *desc;
1171 struct ost_body *body;
1172 struct obd_ioobj *ioobj;
1173 struct niobuf_remote *niobuf;
1174 __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1175 int niocount, i, requested_nob, opc, rc;
1176 struct ptlrpc_request_pool *pool;
1177 struct osc_brw_async_args *aa;
1178 struct brw_page *pg_prev;
1181 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1182 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1184 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1185 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1187 for (niocount = i = 1; i < page_count; i++) {
1188 if (!can_merge_pages(pga[i - 1], pga[i]))
1192 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1193 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1195 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1200 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1201 ptlrpc_at_set_req_timeout(req);
1203 if (opc == OST_WRITE)
1204 desc = ptlrpc_prep_bulk_imp (req, page_count,
1205 BULK_GET_SOURCE, OST_BULK_PORTAL);
1207 desc = ptlrpc_prep_bulk_imp (req, page_count,
1208 BULK_PUT_SINK, OST_BULK_PORTAL);
1210 GOTO(out, rc = -ENOMEM);
1211 /* NB request now owns desc and will free it when it gets freed */
1213 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1214 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1215 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1216 niocount * sizeof(*niobuf));
1218 lustre_set_wire_obdo(&body->oa, oa);
1219 obdo_to_ioobj(oa, ioobj);
1220 ioobj->ioo_bufcnt = niocount;
1222 LASSERT (page_count > 0);
1224 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1225 struct brw_page *pg = pga[i];
1227 LASSERT(pg->count > 0);
1228 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1229 pg->count <= CFS_PAGE_SIZE,
1230 "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1231 i, pg, pg->off, pg->count, pshift);
1233 LASSERTF(i == 0 || pg->off > pg_prev->off,
1234 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1235 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1237 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1238 pg_prev->pg, page_private(pg_prev->pg),
1239 pg_prev->pg->index, pg_prev->off);
1241 LASSERTF(i == 0 || pg->off > pg_prev->off,
1242 "i %d p_c %u\n", i, page_count);
1244 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1245 (pg->flag & OBD_BRW_SRVLOCK));
1247 ptlrpc_prep_bulk_page(desc, pg->pg,
1248 OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1250 requested_nob += pg->count;
1252 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1254 niobuf->len += pg->count;
1256 niobuf->offset = pg->off;
1257 niobuf->len = pg->count;
1258 niobuf->flags = pg->flag;
1263 LASSERTF((void *)(niobuf - niocount) ==
1264 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1265 niocount * sizeof(*niobuf)),
1266 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1267 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1268 (void *)(niobuf - niocount));
1270 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1271 if (osc_should_shrink_grant(cli))
1272 osc_shrink_grant_local(cli, &body->oa);
1274 /* size[REQ_REC_OFF] still sizeof (*body) */
1275 if (opc == OST_WRITE) {
1276 if (cli->cl_checksum) {
1277 /* store cl_cksum_type in a local variable since
1278 * it can be changed via lprocfs */
1279 cksum_type_t cksum_type = cli->cl_cksum_type;
1281 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1282 oa->o_flags &= OBD_FL_LOCAL_MASK;
1283 body->oa.o_flags = 0;
1285 body->oa.o_flags |= cksum_type_pack(cksum_type);
1286 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1287 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1290 cksum_type, pshift);
1291 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1293 /* save this in 'oa', too, for later checking */
1294 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1295 oa->o_flags |= cksum_type_pack(cksum_type);
1297 /* clear out the checksum flag, in case this is a
1298 * resend but cl_checksum is no longer set. b=11238 */
1299 oa->o_valid &= ~OBD_MD_FLCKSUM;
1301 oa->o_cksum = body->oa.o_cksum;
1302 /* 1 RC per niobuf */
1303 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1304 ptlrpc_req_set_repsize(req, 3, size);
1306 if (cli->cl_checksum) {
1307 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1308 body->oa.o_flags = 0;
1309 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1310 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1312 /* 1 RC for the whole I/O */
1313 ptlrpc_req_set_repsize(req, 2, size);
1316 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1317 aa = ptlrpc_req_async_args(req);
1319 aa->aa_requested_nob = requested_nob;
1320 aa->aa_nio_count = niocount;
1321 aa->aa_page_count = page_count;
1325 aa->aa_pshift = pshift;
1326 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1332 ptlrpc_req_finished (req);
1336 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1337 __u32 client_cksum, __u32 server_cksum, int nob,
1338 obd_count page_count, struct brw_page **pga,
1339 cksum_type_t client_cksum_type, int pshift)
1343 cksum_type_t cksum_type;
1345 if (server_cksum == client_cksum) {
1346 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1350 if (oa->o_valid & OBD_MD_FLFLAGS)
1351 cksum_type = cksum_type_unpack(oa->o_flags);
1353 cksum_type = OBD_CKSUM_CRC32;
1355 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1356 cksum_type, pshift);
1358 if (cksum_type != client_cksum_type)
1359 msg = "the server did not use the checksum type specified in "
1360 "the original request - likely a protocol problem";
1361 else if (new_cksum == server_cksum)
1362 msg = "changed on the client after we checksummed it - "
1363 "likely false positive due to mmap IO (bug 11742)";
1364 else if (new_cksum == client_cksum)
1365 msg = "changed in transit before arrival at OST";
1367 msg = "changed in transit AND doesn't match the original - "
1368 "likely false positive due to mmap IO (bug 11742)";
1370 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1371 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1372 "["LPU64"-"LPU64"]\n",
1373 msg, libcfs_nid2str(peer->nid),
1374 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1375 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1378 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1380 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1381 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1382 "client csum now %x\n", client_cksum, client_cksum_type,
1383 server_cksum, cksum_type, new_cksum);
1388 /* Note rc enters this function as number of bytes transferred */
1389 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1391 struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1392 const lnet_process_id_t *peer =
1393 &req->rq_import->imp_connection->c_peer;
1394 struct client_obd *cli = aa->aa_cli;
1395 struct ost_body *body;
1396 __u32 client_cksum = 0;
1399 if (rc < 0 && rc != -EDQUOT)
1402 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1403 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1404 lustre_swab_ost_body);
1406 CERROR ("Can't unpack body\n");
1410 /* set/clear over quota flag for a uid/gid */
1411 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1412 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1413 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1414 body->oa.o_gid, body->oa.o_valid,
1420 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1421 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1423 osc_update_grant(cli, body);
1425 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1427 CERROR ("Unexpected +ve rc %d\n", rc);
1430 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1432 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1433 check_write_checksum(&body->oa, peer, client_cksum,
1434 body->oa.o_cksum, aa->aa_requested_nob,
1435 aa->aa_page_count, aa->aa_ppga,
1436 cksum_type_unpack(aa->aa_oa->o_flags),
1440 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1441 aa->aa_page_count, aa->aa_ppga);
1445 /* The rest of this function executes only for OST_READs */
1446 if (rc > aa->aa_requested_nob) {
1447 CERROR("Unexpected rc %d (%d requested)\n", rc,
1448 aa->aa_requested_nob);
1452 if (rc != req->rq_bulk->bd_nob_transferred) {
1453 CERROR ("Unexpected rc %d (%d transferred)\n",
1454 rc, req->rq_bulk->bd_nob_transferred);
1458 if (rc < aa->aa_requested_nob)
1459 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1461 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1462 static int cksum_counter;
1463 __u32 server_cksum = body->oa.o_cksum;
1466 cksum_type_t cksum_type;
1468 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1469 cksum_type = cksum_type_unpack(body->oa.o_flags);
1471 cksum_type = OBD_CKSUM_CRC32;
1472 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1473 aa->aa_ppga, OST_READ,
1474 cksum_type, aa->aa_pshift);
1476 if (peer->nid == req->rq_bulk->bd_sender) {
1480 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1483 if (server_cksum == ~0 && rc > 0) {
1484 CERROR("Protocol error: server %s set the 'checksum' "
1485 "bit, but didn't send a checksum. Not fatal, "
1486 "but please notify on http://bugzilla.lustre.org/\n",
1487 libcfs_nid2str(peer->nid));
1488 } else if (server_cksum != client_cksum) {
1489 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1490 "%s%s%s inum "LPU64"/"LPU64" object "
1491 LPU64"/"LPU64" extent "
1492 "["LPU64"-"LPU64"]\n",
1493 req->rq_import->imp_obd->obd_name,
1494 libcfs_nid2str(peer->nid),
1496 body->oa.o_valid & OBD_MD_FLFID ?
1497 body->oa.o_fid : (__u64)0,
1498 body->oa.o_valid & OBD_MD_FLFID ?
1499 body->oa.o_generation :(__u64)0,
1501 body->oa.o_valid & OBD_MD_FLGROUP ?
1502 body->oa.o_gr : (__u64)0,
1503 aa->aa_ppga[0]->off,
1504 aa->aa_ppga[aa->aa_page_count-1]->off +
1505 aa->aa_ppga[aa->aa_page_count-1]->count -
1507 CERROR("client %x, server %x, cksum_type %x\n",
1508 client_cksum, server_cksum, cksum_type);
1510 aa->aa_oa->o_cksum = client_cksum;
1514 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1517 } else if (unlikely(client_cksum)) {
1518 static int cksum_missed;
1521 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1522 CERROR("Checksum %u requested from %s but not sent\n",
1523 cksum_missed, libcfs_nid2str(peer->nid));
1529 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1534 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1535 struct lov_stripe_md *lsm,
1536 obd_count page_count, struct brw_page **pga)
1538 struct ptlrpc_request *request;
1542 struct l_wait_info lwi;
1545 init_waitqueue_head(&waitq);
1548 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1549 page_count, pga, &request, 0);
1553 rc = ptlrpc_queue_wait(request);
1555 if (rc == -ETIMEDOUT && request->rq_resend) {
1556 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1557 ptlrpc_req_finished(request);
1561 rc = osc_brw_fini_request(request, rc);
1563 ptlrpc_req_finished(request);
1564 if (osc_recoverable_error(rc)) {
1566 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1567 CERROR("too many resend retries, returning error\n");
1571 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1572 l_wait_event(waitq, 0, &lwi);
1579 int osc_brw_redo_request(struct ptlrpc_request *request,
1580 struct osc_brw_async_args *aa)
1582 struct ptlrpc_request *new_req;
1583 struct ptlrpc_request_set *set = request->rq_set;
1584 struct osc_brw_async_args *new_aa;
1585 struct osc_async_page *oap;
1589 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1590 CERROR("too many resend retries, returning error\n");
1594 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1596 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1597 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1598 aa->aa_cli, aa->aa_oa,
1599 NULL /* lsm unused by osc currently */,
1600 aa->aa_page_count, aa->aa_ppga, &new_req,
1605 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1607 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1608 if (oap->oap_request != NULL) {
1609 LASSERTF(request == oap->oap_request,
1610 "request %p != oap_request %p\n",
1611 request, oap->oap_request);
1612 if (oap->oap_interrupted) {
1613 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1614 ptlrpc_req_finished(new_req);
1619 /* New request takes over pga and oaps from old request.
1620 * Note that copying a list_head doesn't work, need to move it... */
1622 new_req->rq_interpret_reply = request->rq_interpret_reply;
1623 new_req->rq_async_args = request->rq_async_args;
1624 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1626 new_aa = ptlrpc_req_async_args(new_req);
1628 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1629 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1630 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1632 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1633 if (oap->oap_request) {
1634 ptlrpc_req_finished(oap->oap_request);
1635 oap->oap_request = ptlrpc_request_addref(new_req);
1639 /* use ptlrpc_set_add_req is safe because interpret functions work
1640 * in check_set context. only one way exist with access to request
1641 * from different thread got -EINTR - this way protected with
1642 * cl_loi_list_lock */
1643 ptlrpc_set_add_req(set, new_req);
1645 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1647 DEBUG_REQ(D_INFO, new_req, "new request");
1651 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1652 struct lov_stripe_md *lsm, obd_count page_count,
1653 struct brw_page **pga, struct ptlrpc_request_set *set,
1656 struct ptlrpc_request *request;
1657 struct client_obd *cli = &exp->exp_obd->u.cli;
1659 struct osc_brw_async_args *aa;
1662 /* Consume write credits even if doing a sync write -
1663 * otherwise we may run out of space on OST due to grant. */
1664 /* FIXME: unaligned writes must use write grants too */
1665 if (cmd == OBD_BRW_WRITE && pshift == 0) {
1666 client_obd_list_lock(&cli->cl_loi_list_lock);
1667 for (i = 0; i < page_count; i++) {
1668 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1669 osc_consume_write_grant(cli, pga[i]);
1671 client_obd_list_unlock(&cli->cl_loi_list_lock);
1674 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1675 page_count, pga, &request, pshift);
1677 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1680 aa = ptlrpc_req_async_args(request);
1681 if (cmd == OBD_BRW_READ) {
1682 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1683 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1685 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1686 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1687 cli->cl_w_in_flight);
1689 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1691 LASSERT(list_empty(&aa->aa_oaps));
1693 request->rq_interpret_reply = brw_interpret;
1694 ptlrpc_set_add_req(set, request);
1695 client_obd_list_lock(&cli->cl_loi_list_lock);
1696 if (cmd == OBD_BRW_READ)
1697 cli->cl_r_in_flight++;
1699 cli->cl_w_in_flight++;
1700 client_obd_list_unlock(&cli->cl_loi_list_lock);
1701 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1702 } else if (cmd == OBD_BRW_WRITE) {
1703 client_obd_list_lock(&cli->cl_loi_list_lock);
1704 for (i = 0; i < page_count; i++)
1705 osc_release_write_grant(cli, pga[i], 0);
1706 osc_wake_cache_waiters(cli);
1707 client_obd_list_unlock(&cli->cl_loi_list_lock);
1714 * ugh, we want disk allocation on the target to happen in offset order. we'll
1715 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1716 * fine for our small page arrays and doesn't require allocation. its an
1717 * insertion sort that swaps elements that are strides apart, shrinking the
1718 * stride down until its '1' and the array is sorted.
1720 static void sort_brw_pages(struct brw_page **array, int num)
1723 struct brw_page *tmp;
1727 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1732 for (i = stride ; i < num ; i++) {
1735 while (j >= stride && array[j-stride]->off > tmp->off) {
1736 array[j] = array[j - stride];
1741 } while (stride > 1);
1744 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1751 LASSERT (pages > 0);
1752 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1756 if (pages == 0) /* that's all */
1759 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1760 return count; /* doesn't end on page boundary */
1763 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1764 if (offset != 0) /* doesn't start on page boundary */
1771 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1773 struct brw_page **ppga;
1776 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1780 for (i = 0; i < count; i++)
1785 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1787 LASSERT(ppga != NULL);
1788 OBD_FREE(ppga, sizeof(*ppga) * count);
1791 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1792 obd_count page_count, struct brw_page *pga,
1793 struct obd_trans_info *oti)
1795 struct obdo *saved_oa = NULL;
1796 struct brw_page **ppga, **orig;
1797 struct obd_import *imp = class_exp2cliimp(exp);
1798 struct client_obd *cli;
1799 int rc, page_count_orig;
1802 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1803 cli = &imp->imp_obd->u.cli;
1805 if (cmd & OBD_BRW_CHECK) {
1806 /* The caller just wants to know if there's a chance that this
1807 * I/O can succeed */
1809 if (imp->imp_invalid)
1814 /* test_brw with a failed create can trip this, maybe others. */
1815 LASSERT(cli->cl_max_pages_per_rpc);
1819 orig = ppga = osc_build_ppga(pga, page_count);
1822 page_count_orig = page_count;
1824 sort_brw_pages(ppga, page_count);
1825 while (page_count) {
1826 obd_count pages_per_brw;
1828 if (page_count > cli->cl_max_pages_per_rpc)
1829 pages_per_brw = cli->cl_max_pages_per_rpc;
1831 pages_per_brw = page_count;
1833 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1835 if (saved_oa != NULL) {
1836 /* restore previously saved oa */
1837 *oinfo->oi_oa = *saved_oa;
1838 } else if (page_count > pages_per_brw) {
1839 /* save a copy of oa (brw will clobber it) */
1840 OBDO_ALLOC(saved_oa);
1841 if (saved_oa == NULL)
1842 GOTO(out, rc = -ENOMEM);
1843 *saved_oa = *oinfo->oi_oa;
1846 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1847 pages_per_brw, ppga);
1852 page_count -= pages_per_brw;
1853 ppga += pages_per_brw;
1857 osc_release_ppga(orig, page_count_orig);
1859 if (saved_oa != NULL)
1860 OBDO_FREE(saved_oa);
1865 static int osc_brw_async(int cmd, struct obd_export *exp,
1866 struct obd_info *oinfo, obd_count page_count,
1867 struct brw_page *pga, struct obd_trans_info *oti,
1868 struct ptlrpc_request_set *set, int pshift)
1870 struct brw_page **ppga, **orig;
1871 int page_count_orig;
1875 if (cmd & OBD_BRW_CHECK) {
1876 /* The caller just wants to know if there's a chance that this
1877 * I/O can succeed */
1878 struct obd_import *imp = class_exp2cliimp(exp);
1880 if (imp == NULL || imp->imp_invalid)
1885 orig = ppga = osc_build_ppga(pga, page_count);
1888 page_count_orig = page_count;
1890 sort_brw_pages(ppga, page_count);
1891 while (page_count) {
1892 struct brw_page **copy;
1894 obd_count pages_per_brw;
1896 /* one page less under unaligned direct i/o */
1897 pages_per_brw = min_t(obd_count, page_count,
1898 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1901 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1904 /* use ppga only if single RPC is going to fly */
1905 if (pages_per_brw != page_count_orig || ppga != orig) {
1906 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1908 GOTO(out, rc = -ENOMEM);
1909 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1913 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1914 GOTO(out, rc = -ENOMEM);
1916 memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1917 oa->o_flags |= OBD_FL_TEMPORARY;
1921 LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1924 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1929 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1931 if (oa->o_flags & OBD_FL_TEMPORARY)
1937 /* we passed it to async_internal() which is
1938 * now responsible for releasing memory */
1942 page_count -= pages_per_brw;
1943 ppga += pages_per_brw;
1947 osc_release_ppga(orig, page_count_orig);
1951 static void osc_check_rpcs(struct client_obd *cli);
1953 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1954 * the dirty accounting. Writeback completes or truncate happens before
1955 * writing starts. Must be called with the loi lock held. */
1956 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1959 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1962 /* This maintains the lists of pending pages to read/write for a given object
1963 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964 * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1971 if (lop->lop_num_pending == 0)
1974 /* if we have an invalid import we want to drain the queued pages
1975 * by forcing them through rpcs that immediately fail and complete
1976 * the pages. recovery relies on this to empty the queued pages
1977 * before canceling the locks and evicting down the llite pages */
1978 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1981 /* stream rpcs in queue order as long as as there is an urgent page
1982 * queued. this is our cheap solution for good batching in the case
1983 * where writepage marks some random page in the middle of the file
1984 * as urgent because of, say, memory pressure */
1985 if (!list_empty(&lop->lop_urgent)) {
1986 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1990 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1991 optimal = cli->cl_max_pages_per_rpc;
1992 if (cmd & OBD_BRW_WRITE) {
1993 /* trigger a write rpc stream as long as there are dirtiers
1994 * waiting for space. as they're waiting, they're not going to
1995 * create more pages to coallesce with what's waiting.. */
1996 if (!list_empty(&cli->cl_cache_waiters)) {
1997 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2001 /* +16 to avoid triggering rpcs that would want to include pages
2002 * that are being queued but which can't be made ready until
2003 * the queuer finishes with the page. this is a wart for
2004 * llite::commit_write() */
2007 if (lop->lop_num_pending >= optimal)
2013 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2015 struct osc_async_page *oap;
2018 if (list_empty(&lop->lop_urgent))
2021 oap = list_entry(lop->lop_urgent.next,
2022 struct osc_async_page, oap_urgent_item);
2024 if (oap->oap_async_flags & ASYNC_HP) {
2025 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2032 static void on_list(struct list_head *item, struct list_head *list,
2035 if (list_empty(item) && should_be_on)
2036 list_add_tail(item, list);
2037 else if (!list_empty(item) && !should_be_on)
2038 list_del_init(item);
2041 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2042 * can find pages to build into rpcs quickly */
2043 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2045 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2046 lop_makes_hprpc(&loi->loi_read_lop)) {
2048 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2049 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2051 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2052 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2053 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2054 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2057 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2058 loi->loi_write_lop.lop_num_pending);
2060 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2061 loi->loi_read_lop.lop_num_pending);
2064 static void lop_update_pending(struct client_obd *cli,
2065 struct loi_oap_pages *lop, int cmd, int delta)
2067 lop->lop_num_pending += delta;
2068 if (cmd & OBD_BRW_WRITE)
2069 cli->cl_pending_w_pages += delta;
2071 cli->cl_pending_r_pages += delta;
2074 /* this is called when a sync waiter receives an interruption. Its job is to
2075 * get the caller woken as soon as possible. If its page hasn't been put in an
2076 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2077 * desiring interruption which will forcefully complete the rpc once the rpc
2079 static void osc_occ_interrupted(struct oig_callback_context *occ)
2081 struct osc_async_page *oap;
2082 struct loi_oap_pages *lop;
2083 struct lov_oinfo *loi;
2086 /* XXX member_of() */
2087 oap = list_entry(occ, struct osc_async_page, oap_occ);
2089 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2091 oap->oap_interrupted = 1;
2093 /* ok, it's been put in an rpc. only one oap gets a request reference */
2094 if (oap->oap_request != NULL) {
2095 ptlrpc_mark_interrupted(oap->oap_request);
2096 ptlrpcd_wake(oap->oap_request);
2100 /* we don't get interruption callbacks until osc_trigger_group_io()
2101 * has been called and put the sync oaps in the pending/urgent lists.*/
2102 if (!list_empty(&oap->oap_pending_item)) {
2103 list_del_init(&oap->oap_pending_item);
2104 list_del_init(&oap->oap_urgent_item);
2107 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2108 &loi->loi_write_lop : &loi->loi_read_lop;
2109 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2110 loi_list_maint(oap->oap_cli, oap->oap_loi);
2112 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2113 oap->oap_oig = NULL;
2117 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2120 /* this is trying to propogate async writeback errors back up to the
2121 * application. As an async write fails we record the error code for later if
2122 * the app does an fsync. As long as errors persist we force future rpcs to be
2123 * sync so that the app can get a sync error and break the cycle of queueing
2124 * pages for which writeback will fail. */
2125 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2132 ar->ar_force_sync = 1;
2133 ar->ar_min_xid = ptlrpc_sample_next_xid();
2138 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2139 ar->ar_force_sync = 0;
2142 static void osc_oap_to_pending(struct osc_async_page *oap)
2144 struct loi_oap_pages *lop;
2146 if (oap->oap_cmd & OBD_BRW_WRITE)
2147 lop = &oap->oap_loi->loi_write_lop;
2149 lop = &oap->oap_loi->loi_read_lop;
2151 if (oap->oap_async_flags & ASYNC_HP)
2152 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2153 else if (oap->oap_async_flags & ASYNC_URGENT)
2154 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2155 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2156 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2159 /* this must be called holding the loi list lock to give coverage to exit_cache,
2160 * async_flag maintenance, and oap_request */
2161 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2162 struct osc_async_page *oap, int sent, int rc)
2167 if (oap->oap_request != NULL) {
2168 xid = ptlrpc_req_xid(oap->oap_request);
2169 ptlrpc_req_finished(oap->oap_request);
2170 oap->oap_request = NULL;
2173 spin_lock(&oap->oap_lock);
2174 oap->oap_async_flags = 0;
2175 spin_unlock(&oap->oap_lock);
2176 oap->oap_interrupted = 0;
2178 if (oap->oap_cmd & OBD_BRW_WRITE) {
2179 osc_process_ar(&cli->cl_ar, xid, rc);
2180 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2183 if (rc == 0 && oa != NULL) {
2184 if (oa->o_valid & OBD_MD_FLBLOCKS)
2185 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2186 if (oa->o_valid & OBD_MD_FLMTIME)
2187 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2188 if (oa->o_valid & OBD_MD_FLATIME)
2189 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2190 if (oa->o_valid & OBD_MD_FLCTIME)
2191 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2195 osc_exit_cache(cli, oap, sent);
2196 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2197 oap->oap_oig = NULL;
2202 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2203 oap->oap_cmd, oa, rc);
2205 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2206 * I/O on the page could start, but OSC calls it under lock
2207 * and thus we can add oap back to pending safely */
2209 /* upper layer wants to leave the page on pending queue */
2210 osc_oap_to_pending(oap);
2212 osc_exit_cache(cli, oap, sent);
2216 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2218 struct osc_brw_async_args *aa = data;
2219 struct client_obd *cli;
2222 rc = osc_brw_fini_request(request, rc);
2223 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2225 if (osc_recoverable_error(rc)) {
2226 rc = osc_brw_redo_request(request, aa);
2232 client_obd_list_lock(&cli->cl_loi_list_lock);
2233 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2234 * is called so we know whether to go to sync BRWs or wait for more
2235 * RPCs to complete */
2236 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2237 cli->cl_w_in_flight--;
2239 cli->cl_r_in_flight--;
2241 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2242 struct osc_async_page *oap, *tmp;
2243 /* the caller may re-use the oap after the completion call so
2244 * we need to clean it up a little */
2245 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2246 list_del_init(&oap->oap_rpc_item);
2247 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2249 OBDO_FREE(aa->aa_oa);
2250 } else { /* from async_internal() */
2252 for (i = 0; i < aa->aa_page_count; i++)
2253 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2255 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2256 OBDO_FREE(aa->aa_oa);
2258 osc_wake_cache_waiters(cli);
2259 osc_check_rpcs(cli);
2260 client_obd_list_unlock(&cli->cl_loi_list_lock);
2262 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2267 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2268 struct list_head *rpc_list,
2269 int page_count, int cmd)
2271 struct ptlrpc_request *req;
2272 struct brw_page **pga = NULL;
2273 struct osc_brw_async_args *aa;
2274 struct obdo *oa = NULL;
2275 struct obd_async_page_ops *ops = NULL;
2276 void *caller_data = NULL;
2277 struct osc_async_page *oap;
2278 struct ldlm_lock *lock = NULL;
2283 LASSERT(!list_empty(rpc_list));
2285 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2287 RETURN(ERR_PTR(-ENOMEM));
2291 GOTO(out, req = ERR_PTR(-ENOMEM));
2294 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2296 ops = oap->oap_caller_ops;
2297 caller_data = oap->oap_caller_data;
2298 lock = oap->oap_ldlm_lock;
2300 pga[i] = &oap->oap_brw_page;
2301 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2302 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2303 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2307 /* always get the data for the obdo for the rpc */
2308 LASSERT(ops != NULL);
2309 ops->ap_fill_obdo(caller_data, cmd, oa);
2311 oa->o_handle = lock->l_remote_handle;
2312 oa->o_valid |= OBD_MD_FLHANDLE;
2315 sort_brw_pages(pga, page_count);
2316 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2318 CERROR("prep_req failed: %d\n", rc);
2319 GOTO(out, req = ERR_PTR(rc));
2321 oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2322 sizeof(struct ost_body)))->oa;
2324 /* Need to update the timestamps after the request is built in case
2325 * we race with setattr (locally or in queue at OST). If OST gets
2326 * later setattr before earlier BRW (as determined by the request xid),
2327 * the OST will not use BRW timestamps. Sadly, there is no obvious
2328 * way to do this in a single call. bug 10150 */
2329 if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2330 /* in case of lockless read/write do not use inode's
2331 * timestamps because concurrent stat might fill the
2332 * inode with out-of-date times, send current
2334 if (cmd & OBD_BRW_WRITE) {
2335 oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2336 oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2337 valid = OBD_MD_FLATIME;
2339 oa->o_atime = LTIME_S(CURRENT_TIME);
2340 oa->o_valid |= OBD_MD_FLATIME;
2341 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2344 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2346 ops->ap_update_obdo(caller_data, cmd, oa, valid);
2348 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2349 aa = ptlrpc_req_async_args(req);
2350 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2351 list_splice(rpc_list, &aa->aa_oaps);
2352 CFS_INIT_LIST_HEAD(rpc_list);
2359 OBD_FREE(pga, sizeof(*pga) * page_count);
2364 /* the loi lock is held across this function but it's allowed to release
2365 * and reacquire it during its work */
2367 * prepare pages for ASYNC io and put pages in send queue.
2371 * \param cmd - OBD_BRW_* macroses
2372 * \param lop - pending pages
2374 * \return zero if pages successfully add to send queue.
2375 * \return not zere if error occurring.
2377 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2378 int cmd, struct loi_oap_pages *lop)
2380 struct ptlrpc_request *req;
2381 obd_count page_count = 0;
2382 struct osc_async_page *oap = NULL, *tmp;
2383 struct osc_brw_async_args *aa;
2384 struct obd_async_page_ops *ops;
2385 CFS_LIST_HEAD(rpc_list);
2386 unsigned int ending_offset;
2387 unsigned starting_offset = 0;
2391 /* If there are HP OAPs we need to handle at least 1 of them,
2392 * move it the beginning of the pending list for that. */
2393 if (!list_empty(&lop->lop_urgent)) {
2394 oap = list_entry(lop->lop_urgent.next,
2395 struct osc_async_page, oap_urgent_item);
2396 if (oap->oap_async_flags & ASYNC_HP)
2397 list_move(&oap->oap_pending_item, &lop->lop_pending);
2400 /* first we find the pages we're allowed to work with */
2401 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2402 ops = oap->oap_caller_ops;
2404 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2405 "magic 0x%x\n", oap, oap->oap_magic);
2407 if (page_count != 0 &&
2408 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2409 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2410 " oap %p, page %p, srvlock %u\n",
2411 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2414 /* in llite being 'ready' equates to the page being locked
2415 * until completion unlocks it. commit_write submits a page
2416 * as not ready because its unlock will happen unconditionally
2417 * as the call returns. if we race with commit_write giving
2418 * us that page we dont' want to create a hole in the page
2419 * stream, so we stop and leave the rpc to be fired by
2420 * another dirtier or kupdated interval (the not ready page
2421 * will still be on the dirty list). we could call in
2422 * at the end of ll_file_write to process the queue again. */
2423 if (!(oap->oap_async_flags & ASYNC_READY)) {
2424 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2426 CDEBUG(D_INODE, "oap %p page %p returned %d "
2427 "instead of ready\n", oap,
2431 /* llite is telling us that the page is still
2432 * in commit_write and that we should try
2433 * and put it in an rpc again later. we
2434 * break out of the loop so we don't create
2435 * a hole in the sequence of pages in the rpc
2440 /* the io isn't needed.. tell the checks
2441 * below to complete the rpc with EINTR */
2442 spin_lock(&oap->oap_lock);
2443 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2444 spin_unlock(&oap->oap_lock);
2445 oap->oap_count = -EINTR;
2448 spin_lock(&oap->oap_lock);
2449 oap->oap_async_flags |= ASYNC_READY;
2450 spin_unlock(&oap->oap_lock);
2453 LASSERTF(0, "oap %p page %p returned %d "
2454 "from make_ready\n", oap,
2462 * Page submitted for IO has to be locked. Either by
2463 * ->ap_make_ready() or by higher layers.
2465 #if defined(__KERNEL__) && defined(__linux__)
2466 if(!(PageLocked(oap->oap_page) &&
2467 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2468 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2469 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2473 /* If there is a gap at the start of this page, it can't merge
2474 * with any previous page, so we'll hand the network a
2475 * "fragmented" page array that it can't transfer in 1 RDMA */
2476 if (page_count != 0 && oap->oap_page_off != 0)
2479 /* take the page out of our book-keeping */
2480 list_del_init(&oap->oap_pending_item);
2481 lop_update_pending(cli, lop, cmd, -1);
2482 list_del_init(&oap->oap_urgent_item);
2484 if (page_count == 0)
2485 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2486 (PTLRPC_MAX_BRW_SIZE - 1);
2488 /* ask the caller for the size of the io as the rpc leaves. */
2489 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2491 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2492 if (oap->oap_count <= 0) {
2493 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2495 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2499 /* now put the page back in our accounting */
2500 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2501 if (page_count == 0)
2502 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2503 if (++page_count >= cli->cl_max_pages_per_rpc)
2506 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2507 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2508 * have the same alignment as the initial writes that allocated
2509 * extents on the server. */
2510 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2511 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2512 if (ending_offset == 0)
2515 /* If there is a gap at the end of this page, it can't merge
2516 * with any subsequent pages, so we'll hand the network a
2517 * "fragmented" page array that it can't transfer in 1 RDMA */
2518 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2522 osc_wake_cache_waiters(cli);
2524 if (page_count == 0)
2527 loi_list_maint(cli, loi);
2529 client_obd_list_unlock(&cli->cl_loi_list_lock);
2531 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2533 /* this should happen rarely and is pretty bad, it makes the
2534 * pending list not follow the dirty order */
2535 client_obd_list_lock(&cli->cl_loi_list_lock);
2536 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2537 list_del_init(&oap->oap_rpc_item);
2539 /* queued sync pages can be torn down while the pages
2540 * were between the pending list and the rpc */
2541 if (oap->oap_interrupted) {
2542 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2543 osc_ap_completion(cli, NULL, oap, 0,
2547 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2549 loi_list_maint(cli, loi);
2550 RETURN(PTR_ERR(req));
2553 aa = ptlrpc_req_async_args(req);
2554 if (cmd == OBD_BRW_READ) {
2555 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2556 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2557 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2558 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2560 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2561 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2562 cli->cl_w_in_flight);
2563 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2564 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2566 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2568 client_obd_list_lock(&cli->cl_loi_list_lock);
2570 if (cmd == OBD_BRW_READ)
2571 cli->cl_r_in_flight++;
2573 cli->cl_w_in_flight++;
2575 /* queued sync pages can be torn down while the pages
2576 * were between the pending list and the rpc */
2578 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2579 /* only one oap gets a request reference */
2582 if (oap->oap_interrupted && !req->rq_intr) {
2583 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2585 ptlrpc_mark_interrupted(req);
2589 tmp->oap_request = ptlrpc_request_addref(req);
2591 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2592 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2594 req->rq_interpret_reply = brw_interpret;
2595 ptlrpcd_add_req(req);
2599 #define LOI_DEBUG(LOI, STR, args...) \
2600 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2601 !list_empty(&(LOI)->loi_ready_item) || \
2602 !list_empty(&(LOI)->loi_hp_ready_item), \
2603 (LOI)->loi_write_lop.lop_num_pending, \
2604 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2605 (LOI)->loi_read_lop.lop_num_pending, \
2606 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2609 /* This is called by osc_check_rpcs() to find which objects have pages that
2610 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2611 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2614 /* First return objects that have blocked locks so that they
2615 * will be flushed quickly and other clients can get the lock,
2616 * then objects which have pages ready to be stuffed into RPCs */
2617 if (!list_empty(&cli->cl_loi_hp_ready_list))
2618 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2619 struct lov_oinfo, loi_hp_ready_item));
2620 if (!list_empty(&cli->cl_loi_ready_list))
2621 RETURN(list_entry(cli->cl_loi_ready_list.next,
2622 struct lov_oinfo, loi_ready_item));
2624 /* then if we have cache waiters, return all objects with queued
2625 * writes. This is especially important when many small files
2626 * have filled up the cache and not been fired into rpcs because
2627 * they don't pass the nr_pending/object threshhold */
2628 if (!list_empty(&cli->cl_cache_waiters) &&
2629 !list_empty(&cli->cl_loi_write_list))
2630 RETURN(list_entry(cli->cl_loi_write_list.next,
2631 struct lov_oinfo, loi_write_item));
2633 /* then return all queued objects when we have an invalid import
2634 * so that they get flushed */
2635 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2636 if (!list_empty(&cli->cl_loi_write_list))
2637 RETURN(list_entry(cli->cl_loi_write_list.next,
2638 struct lov_oinfo, loi_write_item));
2639 if (!list_empty(&cli->cl_loi_read_list))
2640 RETURN(list_entry(cli->cl_loi_read_list.next,
2641 struct lov_oinfo, loi_read_item));
2646 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2648 struct osc_async_page *oap;
2651 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2652 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2653 struct osc_async_page, oap_urgent_item);
2654 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2657 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2658 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2659 struct osc_async_page, oap_urgent_item);
2660 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2663 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2666 /* called with the loi list lock held */
2667 static void osc_check_rpcs(struct client_obd *cli)
2669 struct lov_oinfo *loi;
2670 int rc = 0, race_counter = 0;
2673 while ((loi = osc_next_loi(cli)) != NULL) {
2674 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2676 if (osc_max_rpc_in_flight(cli, loi))
2679 /* attempt some read/write balancing by alternating between
2680 * reads and writes in an object. The makes_rpc checks here
2681 * would be redundant if we were getting read/write work items
2682 * instead of objects. we don't want send_oap_rpc to drain a
2683 * partial read pending queue when we're given this object to
2684 * do io on writes while there are cache waiters */
2685 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2686 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2687 &loi->loi_write_lop);
2695 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2696 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2697 &loi->loi_read_lop);
2706 /* attempt some inter-object balancing by issueing rpcs
2707 * for each object in turn */
2708 if (!list_empty(&loi->loi_hp_ready_item))
2709 list_del_init(&loi->loi_hp_ready_item);
2710 if (!list_empty(&loi->loi_ready_item))
2711 list_del_init(&loi->loi_ready_item);
2712 if (!list_empty(&loi->loi_write_item))
2713 list_del_init(&loi->loi_write_item);
2714 if (!list_empty(&loi->loi_read_item))
2715 list_del_init(&loi->loi_read_item);
2717 loi_list_maint(cli, loi);
2719 /* send_oap_rpc fails with 0 when make_ready tells it to
2720 * back off. llite's make_ready does this when it tries
2721 * to lock a page queued for write that is already locked.
2722 * we want to try sending rpcs from many objects, but we
2723 * don't want to spin failing with 0. */
2724 if (race_counter == 10)
2730 /* we're trying to queue a page in the osc so we're subject to the
2731 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2732 * If the osc's queued pages are already at that limit, then we want to sleep
2733 * until there is space in the osc's queue for us. We also may be waiting for
2734 * write credits from the OST if there are RPCs in flight that may return some
2735 * before we fall back to sync writes.
2737 * We need this know our allocation was granted in the presence of signals */
2738 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2742 client_obd_list_lock(&cli->cl_loi_list_lock);
2743 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2744 client_obd_list_unlock(&cli->cl_loi_list_lock);
2748 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2749 * grant or cache space. */
2750 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2751 struct osc_async_page *oap)
2753 struct osc_cache_waiter ocw;
2754 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2757 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2758 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2759 cli->cl_dirty_max, obd_max_dirty_pages,
2760 cli->cl_lost_grant, cli->cl_avail_grant);
2762 /* force the caller to try sync io. this can jump the list
2763 * of queued writes and create a discontiguous rpc stream */
2764 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2765 loi->loi_ar.ar_force_sync)
2768 /* Hopefully normal case - cache space and write credits available */
2769 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2770 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2771 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2772 /* account for ourselves */
2773 osc_consume_write_grant(cli, &oap->oap_brw_page);
2777 /* It is safe to block as a cache waiter as long as there is grant
2778 * space available or the hope of additional grant being returned
2779 * when an in flight write completes. Using the write back cache
2780 * if possible is preferable to sending the data synchronously
2781 * because write pages can then be merged in to large requests.
2782 * The addition of this cache waiter will causing pending write
2783 * pages to be sent immediately. */
2784 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2785 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2786 cfs_waitq_init(&ocw.ocw_waitq);
2790 loi_list_maint(cli, loi);
2791 osc_check_rpcs(cli);
2792 client_obd_list_unlock(&cli->cl_loi_list_lock);
2794 CDEBUG(D_CACHE, "sleeping for cache space\n");
2795 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2797 client_obd_list_lock(&cli->cl_loi_list_lock);
2798 if (!list_empty(&ocw.ocw_entry)) {
2799 list_del(&ocw.ocw_entry);
2808 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2809 void **res, int rw, obd_off start, obd_off end,
2810 struct lustre_handle *lockh, int flags)
2812 struct ldlm_lock *lock = NULL;
2813 int rc, release = 0;
2817 if (lockh && lustre_handle_is_used(lockh)) {
2818 /* if a valid lockh is passed, just check that the corresponding
2819 * lock covers the extent */
2820 lock = ldlm_handle2lock(lockh);
2823 struct osc_async_page *oap = *res;
2824 spin_lock(&oap->oap_lock);
2825 lock = oap->oap_ldlm_lock;
2827 LDLM_LOCK_GET(lock);
2828 spin_unlock(&oap->oap_lock);
2830 /* lock can be NULL in case race obd_get_lock vs lock cancel
2831 * so we should be don't try match this */
2832 if (unlikely(!lock))
2835 rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2836 if (release == 1 && rc == 1)
2837 /* if a valid lockh was passed, we just need to check
2838 * that the lock covers the page, no reference should be
2840 ldlm_lock_decref(lockh,
2841 rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2842 LDLM_LOCK_PUT(lock);
2846 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2847 struct lov_oinfo *loi, cfs_page_t *page,
2848 obd_off offset, struct obd_async_page_ops *ops,
2849 void *data, void **res, int flags,
2850 struct lustre_handle *lockh)
2852 struct osc_async_page *oap;
2853 struct ldlm_res_id oid = {{0}};
2859 return size_round(sizeof(*oap));
2862 oap->oap_magic = OAP_MAGIC;
2863 oap->oap_cli = &exp->exp_obd->u.cli;
2866 oap->oap_caller_ops = ops;
2867 oap->oap_caller_data = data;
2869 oap->oap_page = page;
2870 oap->oap_obj_off = offset;
2872 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2873 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2874 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2875 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2877 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2879 spin_lock_init(&oap->oap_lock);
2881 /* If the page was marked as notcacheable - don't add to any locks */
2882 if (!(flags & OBD_PAGE_NO_CACHE)) {
2883 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2884 /* This is the only place where we can call cache_add_extent
2885 without oap_lock, because this page is locked now, and
2886 the lock we are adding it to is referenced, so cannot lose
2887 any pages either. */
2888 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2893 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2897 struct osc_async_page *oap_from_cookie(void *cookie)
2899 struct osc_async_page *oap = cookie;
2900 if (oap->oap_magic != OAP_MAGIC)
2901 return ERR_PTR(-EINVAL);
2905 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2906 struct lov_oinfo *loi, void *cookie,
2907 int cmd, obd_off off, int count,
2908 obd_flag brw_flags, enum async_flags async_flags)
2910 struct client_obd *cli = &exp->exp_obd->u.cli;
2911 struct osc_async_page *oap;
2915 oap = oap_from_cookie(cookie);
2917 RETURN(PTR_ERR(oap));
2919 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2922 if (!list_empty(&oap->oap_pending_item) ||
2923 !list_empty(&oap->oap_urgent_item) ||
2924 !list_empty(&oap->oap_rpc_item))
2927 /* check if the file's owner/group is over quota */
2928 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2929 struct obd_async_page_ops *ops;
2936 ops = oap->oap_caller_ops;
2937 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2938 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2948 loi = lsm->lsm_oinfo[0];
2950 client_obd_list_lock(&cli->cl_loi_list_lock);
2953 oap->oap_page_off = off;
2954 oap->oap_count = count;
2955 oap->oap_brw_flags = brw_flags;
2956 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2957 if (libcfs_memory_pressure_get())
2958 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2959 spin_lock(&oap->oap_lock);
2960 oap->oap_async_flags = async_flags;
2961 spin_unlock(&oap->oap_lock);
2963 if (cmd & OBD_BRW_WRITE) {
2964 rc = osc_enter_cache(cli, loi, oap);
2966 client_obd_list_unlock(&cli->cl_loi_list_lock);
2971 osc_oap_to_pending(oap);
2972 loi_list_maint(cli, loi);
2974 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2977 osc_check_rpcs(cli);
2978 client_obd_list_unlock(&cli->cl_loi_list_lock);
2983 /* aka (~was & now & flag), but this is more clear :) */
2984 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2986 static int osc_set_async_flags(struct obd_export *exp,
2987 struct lov_stripe_md *lsm,
2988 struct lov_oinfo *loi, void *cookie,
2989 obd_flag async_flags)
2991 struct client_obd *cli = &exp->exp_obd->u.cli;
2992 struct loi_oap_pages *lop;
2993 struct osc_async_page *oap;
2997 oap = oap_from_cookie(cookie);
2999 RETURN(PTR_ERR(oap));
3002 * bug 7311: OST-side locking is only supported for liblustre for now
3003 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
3004 * implementation has to handle case where OST-locked page was picked
3005 * up by, e.g., ->writepage().
3007 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
3008 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
3011 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3015 loi = lsm->lsm_oinfo[0];
3017 if (oap->oap_cmd & OBD_BRW_WRITE) {
3018 lop = &loi->loi_write_lop;
3020 lop = &loi->loi_read_lop;
3023 client_obd_list_lock(&cli->cl_loi_list_lock);
3024 /* oap_lock provides atomic semantics of oap_async_flags access */
3025 spin_lock(&oap->oap_lock);
3026 if (list_empty(&oap->oap_pending_item))
3027 GOTO(out, rc = -EINVAL);
3029 if ((oap->oap_async_flags & async_flags) == async_flags)
3032 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3033 oap->oap_async_flags |= ASYNC_READY;
3035 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3036 list_empty(&oap->oap_rpc_item)) {
3037 if (oap->oap_async_flags & ASYNC_HP)
3038 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3040 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3041 oap->oap_async_flags |= ASYNC_URGENT;
3042 loi_list_maint(cli, loi);
3045 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3046 oap->oap_async_flags);
3048 spin_unlock(&oap->oap_lock);
3049 osc_check_rpcs(cli);
3050 client_obd_list_unlock(&cli->cl_loi_list_lock);
3054 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3055 struct lov_oinfo *loi,
3056 struct obd_io_group *oig, void *cookie,
3057 int cmd, obd_off off, int count,
3059 obd_flag async_flags)
3061 struct client_obd *cli = &exp->exp_obd->u.cli;
3062 struct osc_async_page *oap;
3063 struct loi_oap_pages *lop;
3067 oap = oap_from_cookie(cookie);
3069 RETURN(PTR_ERR(oap));
3071 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3074 if (!list_empty(&oap->oap_pending_item) ||
3075 !list_empty(&oap->oap_urgent_item) ||
3076 !list_empty(&oap->oap_rpc_item))
3080 loi = lsm->lsm_oinfo[0];
3082 client_obd_list_lock(&cli->cl_loi_list_lock);
3085 oap->oap_page_off = off;
3086 oap->oap_count = count;
3087 oap->oap_brw_flags = brw_flags;
3088 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3089 if (libcfs_memory_pressure_get())
3090 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3091 spin_lock(&oap->oap_lock);
3092 oap->oap_async_flags = async_flags;
3093 spin_unlock(&oap->oap_lock);
3095 if (cmd & OBD_BRW_WRITE)
3096 lop = &loi->loi_write_lop;
3098 lop = &loi->loi_read_lop;
3100 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3101 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3103 rc = oig_add_one(oig, &oap->oap_occ);
3106 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3107 oap, oap->oap_page, rc);
3109 client_obd_list_unlock(&cli->cl_loi_list_lock);
3114 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3115 struct loi_oap_pages *lop, int cmd)
3117 struct list_head *pos, *tmp;
3118 struct osc_async_page *oap;
3120 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3121 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3122 list_del(&oap->oap_pending_item);
3123 osc_oap_to_pending(oap);
3125 loi_list_maint(cli, loi);
3128 static int osc_trigger_group_io(struct obd_export *exp,
3129 struct lov_stripe_md *lsm,
3130 struct lov_oinfo *loi,
3131 struct obd_io_group *oig)
3133 struct client_obd *cli = &exp->exp_obd->u.cli;
3137 loi = lsm->lsm_oinfo[0];
3139 client_obd_list_lock(&cli->cl_loi_list_lock);
3141 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3142 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3144 osc_check_rpcs(cli);
3145 client_obd_list_unlock(&cli->cl_loi_list_lock);
3150 static int osc_teardown_async_page(struct obd_export *exp,
3151 struct lov_stripe_md *lsm,
3152 struct lov_oinfo *loi, void *cookie)
3154 struct client_obd *cli = &exp->exp_obd->u.cli;
3155 struct loi_oap_pages *lop;
3156 struct osc_async_page *oap;
3160 oap = oap_from_cookie(cookie);
3162 RETURN(PTR_ERR(oap));
3165 loi = lsm->lsm_oinfo[0];
3167 if (oap->oap_cmd & OBD_BRW_WRITE) {
3168 lop = &loi->loi_write_lop;
3170 lop = &loi->loi_read_lop;
3173 client_obd_list_lock(&cli->cl_loi_list_lock);
3175 if (!list_empty(&oap->oap_rpc_item))
3176 GOTO(out, rc = -EBUSY);
3178 osc_exit_cache(cli, oap, 0);
3179 osc_wake_cache_waiters(cli);
3181 if (!list_empty(&oap->oap_urgent_item)) {
3182 list_del_init(&oap->oap_urgent_item);
3183 spin_lock(&oap->oap_lock);
3184 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3185 spin_unlock(&oap->oap_lock);
3188 if (!list_empty(&oap->oap_pending_item)) {
3189 list_del_init(&oap->oap_pending_item);
3190 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3192 loi_list_maint(cli, loi);
3193 cache_remove_extent(cli->cl_cache, oap);
3195 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3197 client_obd_list_unlock(&cli->cl_loi_list_lock);
3201 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3202 struct ldlm_lock_desc *new, void *data,
3205 struct lustre_handle lockh = { 0 };
3209 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3210 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3215 case LDLM_CB_BLOCKING:
3216 ldlm_lock2handle(lock, &lockh);
3217 rc = ldlm_cli_cancel(&lockh);
3219 CERROR("ldlm_cli_cancel failed: %d\n", rc);
3221 case LDLM_CB_CANCELING: {
3223 ldlm_lock2handle(lock, &lockh);
3224 /* This lock wasn't granted, don't try to do anything */
3225 if (lock->l_req_mode != lock->l_granted_mode)
3228 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3231 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3232 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3233 lock, new, data,flag);
3242 EXPORT_SYMBOL(osc_extent_blocking_cb);
3244 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3247 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3250 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3253 lock_res_and_lock(lock);
3254 #if defined (__KERNEL__) && defined (__linux__)
3255 /* Liang XXX: Darwin and Winnt checking should be added */
3256 if (lock->l_ast_data && lock->l_ast_data != data) {
3257 struct inode *new_inode = data;
3258 struct inode *old_inode = lock->l_ast_data;
3259 if (!(old_inode->i_state & I_FREEING))
3260 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3261 LASSERTF(old_inode->i_state & I_FREEING,
3262 "Found existing inode %p/%lu/%u state %lu in lock: "
3263 "setting data to %p/%lu/%u\n", old_inode,
3264 old_inode->i_ino, old_inode->i_generation,
3266 new_inode, new_inode->i_ino, new_inode->i_generation);
3269 lock->l_ast_data = data;
3270 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3271 unlock_res_and_lock(lock);
3272 LDLM_LOCK_PUT(lock);
3275 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3276 ldlm_iterator_t replace, void *data)
3278 struct ldlm_res_id res_id;
3279 struct obd_device *obd = class_exp2obd(exp);
3281 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3282 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3286 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3287 struct obd_info *oinfo, int intent, int rc)
3292 /* The request was created before ldlm_cli_enqueue call. */
3293 if (rc == ELDLM_LOCK_ABORTED) {
3294 struct ldlm_reply *rep;
3296 /* swabbed by ldlm_cli_enqueue() */
3297 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3298 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3300 LASSERT(rep != NULL);
3301 if (rep->lock_policy_res1)
3302 rc = rep->lock_policy_res1;
3306 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3307 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3308 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3309 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3310 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3314 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3316 /* Call the update callback. */
3317 rc = oinfo->oi_cb_up(oinfo, rc);
3321 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3324 struct osc_enqueue_args *aa = data;
3325 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3326 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3327 struct ldlm_lock *lock;
3329 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3331 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3333 /* Complete obtaining the lock procedure. */
3334 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3336 &aa->oa_oi->oi_flags,
3337 &lsm->lsm_oinfo[0]->loi_lvb,
3338 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3339 lustre_swab_ost_lvb,
3340 aa->oa_oi->oi_lockh, rc);
3342 /* Complete osc stuff. */
3343 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3345 /* Release the lock for async request. */
3346 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3347 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3349 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3350 aa->oa_oi->oi_lockh, req, aa);
3351 LDLM_LOCK_PUT(lock);
3355 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3356 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3357 * other synchronous requests, however keeping some locks and trying to obtain
3358 * others may take a considerable amount of time in a case of ost failure; and
3359 * when other sync requests do not get released lock from a client, the client
3360 * is excluded from the cluster -- such scenarious make the life difficult, so
3361 * release locks just after they are obtained. */
3362 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3363 struct ldlm_enqueue_info *einfo,
3364 struct ptlrpc_request_set *rqset)
3366 struct ldlm_res_id res_id;
3367 struct obd_device *obd = exp->exp_obd;
3368 struct ldlm_reply *rep;
3369 struct ptlrpc_request *req = NULL;
3370 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3375 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3376 oinfo->oi_md->lsm_object_gr, &res_id);
3377 /* Filesystem lock extents are extended to page boundaries so that
3378 * dealing with the page cache is a little smoother. */
3379 oinfo->oi_policy.l_extent.start -=
3380 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3381 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3383 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3386 /* Next, search for already existing extent locks that will cover us */
3387 /* If we're trying to read, we also search for an existing PW lock. The
3388 * VFS and page cache already protect us locally, so lots of readers/
3389 * writers can share a single PW lock.
3391 * There are problems with conversion deadlocks, so instead of
3392 * converting a read lock to a write lock, we'll just enqueue a new
3395 * At some point we should cancel the read lock instead of making them
3396 * send us a blocking callback, but there are problems with canceling
3397 * locks out from other users right now, too. */
3398 mode = einfo->ei_mode;
3399 if (einfo->ei_mode == LCK_PR)
3401 mode = ldlm_lock_match(obd->obd_namespace,
3402 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3403 einfo->ei_type, &oinfo->oi_policy, mode,
3406 /* addref the lock only if not async requests and PW lock is
3407 * matched whereas we asked for PR. */
3408 if (!rqset && einfo->ei_mode != mode)
3409 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3410 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3413 /* I would like to be able to ASSERT here that rss <=
3414 * kms, but I can't, for reasons which are explained in
3418 /* We already have a lock, and it's referenced */
3419 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3421 /* For async requests, decref the lock. */
3422 if (einfo->ei_mode != mode)
3423 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3425 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3433 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3434 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3435 [DLM_LOCKREQ_OFF + 1] = 0 };
3437 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3441 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3442 size[DLM_REPLY_REC_OFF] =
3443 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3444 ptlrpc_req_set_repsize(req, 3, size);
3447 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3448 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3450 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3451 &oinfo->oi_policy, &oinfo->oi_flags,
3452 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3453 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3454 lustre_swab_ost_lvb, oinfo->oi_lockh,
3458 struct osc_enqueue_args *aa;
3459 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3460 aa = ptlrpc_req_async_args(req);
3465 req->rq_interpret_reply = osc_enqueue_interpret;
3466 ptlrpc_set_add_req(rqset, req);
3467 } else if (intent) {
3468 ptlrpc_req_finished(req);
3473 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3475 ptlrpc_req_finished(req);
3480 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3481 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3482 int *flags, void *data, struct lustre_handle *lockh,
3485 struct ldlm_res_id res_id;
3486 struct obd_device *obd = exp->exp_obd;
3487 int lflags = *flags;
3491 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3493 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3495 /* Filesystem lock extents are extended to page boundaries so that
3496 * dealing with the page cache is a little smoother */
3497 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3498 policy->l_extent.end |= ~CFS_PAGE_MASK;
3500 /* Next, search for already existing extent locks that will cover us */
3501 /* If we're trying to read, we also search for an existing PW lock. The
3502 * VFS and page cache already protect us locally, so lots of readers/
3503 * writers can share a single PW lock. */
3507 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3508 &res_id, type, policy, rc, lockh);
3510 osc_set_data_with_check(lockh, data, lflags);
3511 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3512 ldlm_lock_addref(lockh, LCK_PR);
3513 ldlm_lock_decref(lockh, LCK_PW);
3515 if (n_matches != NULL)
3522 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3523 __u32 mode, struct lustre_handle *lockh, int flags,
3528 if (unlikely(mode == LCK_GROUP))
3529 ldlm_lock_decref_and_cancel(lockh, mode);
3531 ldlm_lock_decref(lockh, mode);
3536 static int osc_cancel_unused(struct obd_export *exp,
3537 struct lov_stripe_md *lsm, int flags, void *opaque)
3539 struct obd_device *obd = class_exp2obd(exp);
3540 struct ldlm_res_id res_id, *resp = NULL;
3543 resp = osc_build_res_name(lsm->lsm_object_id,
3544 lsm->lsm_object_gr, &res_id);
3547 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3551 static int osc_join_lru(struct obd_export *exp,
3552 struct lov_stripe_md *lsm, int join)
3554 struct obd_device *obd = class_exp2obd(exp);
3555 struct ldlm_res_id res_id, *resp = NULL;
3558 resp = osc_build_res_name(lsm->lsm_object_id,
3559 lsm->lsm_object_gr, &res_id);
3562 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3566 static int osc_statfs_interpret(struct ptlrpc_request *req,
3569 struct osc_async_args *aa = data;
3570 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3571 struct obd_statfs *msfs;
3576 /* The request has in fact never been sent
3577 * due to issues at a higher level (LOV).
3578 * Exit immediately since the caller is
3579 * aware of the problem and takes care
3580 * of the clean up */
3583 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3584 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3590 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3591 lustre_swab_obd_statfs);
3593 CERROR("Can't unpack obd_statfs\n");
3594 GOTO(out, rc = -EPROTO);
3597 /* Reinitialize the RDONLY and DEGRADED flags at the client
3598 * on each statfs, so they don't stay set permanently. */
3599 spin_lock(&cli->cl_oscc.oscc_lock);
3601 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3602 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3603 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3604 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3606 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3607 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3608 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3609 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3611 /* Add a bit of hysteresis so this flag isn't continually flapping,
3612 * and ensure that new files don't get extremely fragmented due to
3613 * only a small amount of available space in the filesystem.
3614 * We want to set the NOSPC flag when there is less than ~0.1% free
3615 * and clear it when there is at least ~0.2% free space, so:
3616 * avail < ~0.1% max max = avail + used
3617 * 1025 * avail < avail + used used = blocks - free
3618 * 1024 * avail < used
3619 * 1024 * avail < blocks - free
3620 * avail < ((blocks - free) >> 10)
3622 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3623 * lose that amount of space so in those cases we report no space left
3624 * if their is less than 1 GB left. */
3625 used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3626 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3627 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3628 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3629 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3630 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3631 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3633 spin_unlock(&cli->cl_oscc.oscc_lock);
3635 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3637 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3641 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3642 __u64 max_age, struct ptlrpc_request_set *rqset)
3644 struct ptlrpc_request *req;
3645 struct osc_async_args *aa;
3646 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3649 /* We could possibly pass max_age in the request (as an absolute
3650 * timestamp or a "seconds.usec ago") so the target can avoid doing
3651 * extra calls into the filesystem if that isn't necessary (e.g.
3652 * during mount that would help a bit). Having relative timestamps
3653 * is not so great if request processing is slow, while absolute
3654 * timestamps are not ideal because they need time synchronization. */
3655 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3656 OST_STATFS, 1, NULL, NULL);
3660 ptlrpc_req_set_repsize(req, 2, size);
3661 req->rq_request_portal = OST_CREATE_PORTAL;
3662 ptlrpc_at_set_req_timeout(req);
3663 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3664 /* procfs requests not want stat in wait for avoid deadlock */
3665 req->rq_no_resend = 1;
3666 req->rq_no_delay = 1;
3669 req->rq_interpret_reply = osc_statfs_interpret;
3670 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3671 aa = ptlrpc_req_async_args(req);
3674 ptlrpc_set_add_req(rqset, req);
3678 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3679 __u64 max_age, __u32 flags)
3681 struct obd_statfs *msfs;
3682 struct ptlrpc_request *req;
3683 struct obd_import *imp = NULL;
3684 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3688 /*Since the request might also come from lprocfs, so we need
3689 *sync this with client_disconnect_export Bug15684*/
3690 down_read(&obd->u.cli.cl_sem);
3691 if (obd->u.cli.cl_import)
3692 imp = class_import_get(obd->u.cli.cl_import);
3693 up_read(&obd->u.cli.cl_sem);
3697 /* We could possibly pass max_age in the request (as an absolute
3698 * timestamp or a "seconds.usec ago") so the target can avoid doing
3699 * extra calls into the filesystem if that isn't necessary (e.g.
3700 * during mount that would help a bit). Having relative timestamps
3701 * is not so great if request processing is slow, while absolute
3702 * timestamps are not ideal because they need time synchronization. */
3703 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3704 OST_STATFS, 1, NULL, NULL);
3706 class_import_put(imp);
3710 ptlrpc_req_set_repsize(req, 2, size);
3711 req->rq_request_portal = OST_CREATE_PORTAL;
3712 ptlrpc_at_set_req_timeout(req);
3714 if (flags & OBD_STATFS_NODELAY) {
3715 /* procfs requests not want stat in wait for avoid deadlock */
3716 req->rq_no_resend = 1;
3717 req->rq_no_delay = 1;
3720 rc = ptlrpc_queue_wait(req);
3724 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3725 lustre_swab_obd_statfs);
3727 CERROR("Can't unpack obd_statfs\n");
3728 GOTO(out, rc = -EPROTO);
3731 memcpy(osfs, msfs, sizeof(*osfs));
3735 ptlrpc_req_finished(req);
3739 /* Retrieve object striping information.
3741 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3742 * the maximum number of OST indices which will fit in the user buffer.
3743 * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3745 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3747 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3748 struct lov_user_md_v3 lum, *lumk;
3749 int rc = 0, lum_size;
3750 struct lov_user_ost_data_v1 *lmm_objects;
3756 /* we only need the header part from user space to get lmm_magic and
3757 * lmm_stripe_count, (the header part is common to v1 and v3) */
3758 lum_size = sizeof(struct lov_user_md_v1);
3759 memset(&lum, 0x00, sizeof(lum));
3760 if (copy_from_user(&lum, lump, lum_size))
3763 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3764 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3767 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3768 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3769 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3770 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3772 /* we can use lov_mds_md_size() to compute lum_size
3773 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3774 if (lum.lmm_stripe_count > 0) {
3775 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3776 OBD_ALLOC(lumk, lum_size);
3779 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3780 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3782 lmm_objects = &(lumk->lmm_objects[0]);
3783 lmm_objects->l_object_id = lsm->lsm_object_id;
3785 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3789 lumk->lmm_magic = lum.lmm_magic;
3790 lumk->lmm_stripe_count = 1;
3791 lumk->lmm_object_id = lsm->lsm_object_id;
3793 if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3794 (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3795 /* lsm not in host order, so count also need be in same order */
3796 __swab32s(&lumk->lmm_magic);
3797 __swab16s(&lumk->lmm_stripe_count);
3798 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3799 if (lum.lmm_stripe_count > 0)
3800 lustre_swab_lov_user_md_objects(
3801 (struct lov_user_md_v1*)lumk);
3804 if (copy_to_user(lump, lumk, lum_size))
3808 OBD_FREE(lumk, lum_size);
3814 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3815 void *karg, void *uarg)
3817 struct obd_device *obd = exp->exp_obd;
3818 struct obd_ioctl_data *data = karg;
3822 if (!try_module_get(THIS_MODULE)) {
3823 CERROR("Can't get module. Is it alive?");
3827 case OBD_IOC_LOV_GET_CONFIG: {
3829 struct lov_desc *desc;
3830 struct obd_uuid uuid;
3834 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3835 GOTO(out, err = -EINVAL);
3837 data = (struct obd_ioctl_data *)buf;
3839 if (sizeof(*desc) > data->ioc_inllen1) {
3840 obd_ioctl_freedata(buf, len);
3841 GOTO(out, err = -EINVAL);
3844 if (data->ioc_inllen2 < sizeof(uuid)) {
3845 obd_ioctl_freedata(buf, len);
3846 GOTO(out, err = -EINVAL);
3849 desc = (struct lov_desc *)data->ioc_inlbuf1;
3850 desc->ld_tgt_count = 1;
3851 desc->ld_active_tgt_count = 1;
3852 desc->ld_default_stripe_count = 1;
3853 desc->ld_default_stripe_size = 0;
3854 desc->ld_default_stripe_offset = 0;
3855 desc->ld_pattern = 0;
3856 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3858 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3860 err = copy_to_user((void *)uarg, buf, len);
3863 obd_ioctl_freedata(buf, len);
3866 case LL_IOC_LOV_SETSTRIPE:
3867 err = obd_alloc_memmd(exp, karg);
3871 case LL_IOC_LOV_GETSTRIPE:
3872 err = osc_getstripe(karg, uarg);
3874 case OBD_IOC_CLIENT_RECOVER:
3875 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3880 case IOC_OSC_SET_ACTIVE:
3881 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3884 case OBD_IOC_POLL_QUOTACHECK:
3885 err = lquota_poll_check(quota_interface, exp,
3886 (struct if_quotacheck *)karg);
3888 case OBD_IOC_DESTROY: {
3891 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3892 GOTO (out, err = -EPERM);
3893 oa = &data->ioc_obdo1;
3896 GOTO(out, err = -EINVAL);
3898 oa->o_valid |= OBD_MD_FLGROUP;
3900 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3903 case OBD_IOC_PING_TARGET:
3904 err = ptlrpc_obd_ping(obd);
3907 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3908 cmd, cfs_curproc_comm());
3909 GOTO(out, err = -ENOTTY);
3912 module_put(THIS_MODULE);
3916 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3917 void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3920 if (!vallen || !val)
3923 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3924 __u32 *stripe = val;
3925 *vallen = sizeof(*stripe);
3928 } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3929 struct client_obd *cli = &exp->exp_obd->u.cli;
3930 __u64 *rpcsize = val;
3931 LASSERT(*vallen == sizeof(__u64));
3932 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3934 } else if (KEY_IS(KEY_LAST_ID)) {
3935 struct ptlrpc_request *req;
3937 char *bufs[2] = { NULL, key };
3938 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3941 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3942 OST_GET_INFO, 2, size, bufs);
3946 size[REPLY_REC_OFF] = *vallen;
3947 ptlrpc_req_set_repsize(req, 2, size);
3948 rc = ptlrpc_queue_wait(req);
3952 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3953 lustre_swab_ost_last_id);
3954 if (reply == NULL) {
3955 CERROR("Can't unpack OST last ID\n");
3956 GOTO(out, rc = -EPROTO);
3958 *((obd_id *)val) = *reply;
3960 ptlrpc_req_finished(req);
3962 } else if (KEY_IS(KEY_FIEMAP)) {
3963 struct ptlrpc_request *req;
3964 struct ll_user_fiemap *reply;
3965 char *bufs[2] = { NULL, key };
3966 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3969 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3970 OST_GET_INFO, 2, size, bufs);
3974 size[REPLY_REC_OFF] = *vallen;
3975 ptlrpc_req_set_repsize(req, 2, size);
3977 rc = ptlrpc_queue_wait(req);
3980 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3981 lustre_swab_fiemap);
3982 if (reply == NULL) {
3983 CERROR("Can't unpack FIEMAP reply.\n");
3984 GOTO(out1, rc = -EPROTO);
3987 memcpy(val, reply, *vallen);
3990 ptlrpc_req_finished(req);
3998 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
4001 struct llog_ctxt *ctxt;
4002 struct obd_import *imp = req->rq_import;
4008 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4011 rc = llog_initiator_connect(ctxt);
4013 CERROR("cannot establish connection for "
4014 "ctxt %p: %d\n", ctxt, rc);
4017 llog_ctxt_put(ctxt);
4018 spin_lock(&imp->imp_lock);
4019 imp->imp_server_timeout = 1;
4020 imp->imp_pingable = 1;
4021 spin_unlock(&imp->imp_lock);
4022 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4027 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4028 void *key, obd_count vallen, void *val,
4029 struct ptlrpc_request_set *set)
4031 struct ptlrpc_request *req;
4032 struct obd_device *obd = exp->exp_obd;
4033 struct obd_import *imp = class_exp2cliimp(exp);
4034 __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4035 char *bufs[3] = { NULL, key, val };
4038 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4040 if (KEY_IS(KEY_NEXT_ID)) {
4042 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4044 if (vallen != sizeof(obd_id))
4047 /* avoid race between allocate new object and set next id
4048 * from ll_sync thread */
4049 spin_lock(&oscc->oscc_lock);
4050 new_val = *((obd_id*)val) + 1;
4051 if (new_val > oscc->oscc_next_id)
4052 oscc->oscc_next_id = new_val;
4053 spin_unlock(&oscc->oscc_lock);
4055 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4056 exp->exp_obd->obd_name,
4057 oscc->oscc_next_id);
4062 if (KEY_IS(KEY_INIT_RECOV)) {
4063 if (vallen != sizeof(int))
4065 spin_lock(&imp->imp_lock);
4066 imp->imp_initial_recov = *(int *)val;
4067 spin_unlock(&imp->imp_lock);
4068 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4069 exp->exp_obd->obd_name,
4070 imp->imp_initial_recov);
4074 if (KEY_IS(KEY_CHECKSUM)) {
4075 if (vallen != sizeof(int))
4077 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4081 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4084 /* We pass all other commands directly to OST. Since nobody calls osc
4085 methods directly and everybody is supposed to go through LOV, we
4086 assume lov checked invalid values for us.
4087 The only recognised values so far are evict_by_nid and mds_conn.
4088 Even if something bad goes through, we'd get a -EINVAL from OST
4091 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4096 if (KEY_IS(KEY_MDS_CONN))
4097 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4098 else if (KEY_IS(KEY_GRANT_SHRINK))
4099 req->rq_interpret_reply = osc_shrink_grant_interpret;
4101 if (KEY_IS(KEY_GRANT_SHRINK)) {
4102 struct osc_grant_args *aa;
4105 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4106 aa = ptlrpc_req_async_args(req);
4109 ptlrpc_req_finished(req);
4112 *oa = ((struct ost_body *)val)->oa;
4116 ptlrpc_req_set_repsize(req, 2, size);
4117 ptlrpcd_add_req(req);
4119 ptlrpc_req_set_repsize(req, 1, NULL);
4120 ptlrpc_set_add_req(set, req);
4121 ptlrpc_check_set(set);
4128 static struct llog_operations osc_size_repl_logops = {
4129 lop_cancel: llog_obd_repl_cancel
4132 static struct llog_operations osc_mds_ost_orig_logops;
4133 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4136 struct llog_catid catid;
4137 static char name[32] = CATLIST;
4143 mutex_down(&disk_obd->obd_llog_cat_process);
4145 rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4147 CERROR("rc: %d\n", rc);
4148 GOTO(out_unlock, rc);
4151 CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4152 obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4153 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4156 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4157 &catid.lci_logid, &osc_mds_ost_orig_logops);
4159 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4163 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4164 &osc_size_repl_logops);
4166 struct llog_ctxt *ctxt =
4167 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4170 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4174 CERROR("osc '%s' tgt '%s' rc=%d\n",
4175 obd->obd_name, disk_obd->obd_name, rc);
4176 CERROR("logid "LPX64":0x%x\n",
4177 catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4179 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4182 CERROR("rc: %d\n", rc);
4185 mutex_up(&disk_obd->obd_llog_cat_process);
4190 static int osc_llog_finish(struct obd_device *obd, int count)
4192 struct llog_ctxt *ctxt;
4193 int rc = 0, rc2 = 0;
4196 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4198 rc = llog_cleanup(ctxt);
4200 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4202 rc2 = llog_cleanup(ctxt);
4209 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4210 struct obd_uuid *cluuid,
4211 struct obd_connect_data *data,
4214 struct client_obd *cli = &obd->u.cli;
4216 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4219 client_obd_list_lock(&cli->cl_loi_list_lock);
4220 data->ocd_grant = cli->cl_avail_grant + cli->cl_dirty ?:
4221 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4222 lost_grant = cli->cl_lost_grant;
4223 cli->cl_lost_grant = 0;
4224 client_obd_list_unlock(&cli->cl_loi_list_lock);
4226 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4227 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4228 cli->cl_dirty, cli->cl_avail_grant, lost_grant);
4229 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4230 " ocd_grant: %d\n", data->ocd_connect_flags,
4231 data->ocd_version, data->ocd_grant);
4237 static int osc_disconnect(struct obd_export *exp)
4239 struct obd_device *obd = class_exp2obd(exp);
4240 struct llog_ctxt *ctxt;
4243 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4245 if (obd->u.cli.cl_conn_count == 1) {
4246 /* Flush any remaining cancel messages out to the
4248 llog_sync(ctxt, exp);
4250 llog_ctxt_put(ctxt);
4252 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4256 rc = client_disconnect_export(exp);
4258 * Initially we put del_shrink_grant before disconnect_export, but it
4259 * causes the following problem if setup (connect) and cleanup
4260 * (disconnect) are tangled together.
4261 * connect p1 disconnect p2
4262 * ptlrpc_connect_import
4263 * ............... class_manual_cleanup
4266 * ptlrpc_connect_interrupt
4268 * add this client to shrink list
4270 * Bang! pinger trigger the shrink.
4271 * So the osc should be disconnected from the shrink list, after we
4272 * are sure the import has been destroyed. BUG18662
4274 if (obd->u.cli.cl_import == NULL)
4275 osc_del_shrink_grant(&obd->u.cli);
4279 static int osc_import_event(struct obd_device *obd,
4280 struct obd_import *imp,
4281 enum obd_import_event event)
4283 struct client_obd *cli;
4287 LASSERT(imp->imp_obd == obd);
4290 case IMP_EVENT_DISCON: {
4291 /* Only do this on the MDS OSC's */
4292 if (imp->imp_server_timeout) {
4293 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4295 spin_lock(&oscc->oscc_lock);
4296 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4297 spin_unlock(&oscc->oscc_lock);
4300 client_obd_list_lock(&cli->cl_loi_list_lock);
4301 cli->cl_avail_grant = 0;
4302 cli->cl_lost_grant = 0;
4303 client_obd_list_unlock(&cli->cl_loi_list_lock);
4304 ptlrpc_import_setasync(imp, -1);
4308 case IMP_EVENT_INACTIVE: {
4309 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4312 case IMP_EVENT_INVALIDATE: {
4313 struct ldlm_namespace *ns = obd->obd_namespace;
4317 client_obd_list_lock(&cli->cl_loi_list_lock);
4318 /* all pages go to failing rpcs due to the invalid import */
4319 osc_check_rpcs(cli);
4320 client_obd_list_unlock(&cli->cl_loi_list_lock);
4322 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4326 case IMP_EVENT_ACTIVE: {
4327 /* Only do this on the MDS OSC's */
4328 if (imp->imp_server_timeout) {
4329 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4331 spin_lock(&oscc->oscc_lock);
4332 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4333 spin_unlock(&oscc->oscc_lock);
4335 CDEBUG(D_INFO, "notify server \n");
4336 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4339 case IMP_EVENT_OCD: {
4340 struct obd_connect_data *ocd = &imp->imp_connect_data;
4342 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4343 osc_init_grant(&obd->u.cli, ocd);
4346 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4347 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4349 ptlrpc_import_setasync(imp, 1);
4350 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4354 CERROR("Unknown import event %d\n", event);
4360 /* determine whether the lock can be canceled before replaying the lock
4361 * during recovery, see bug16774 for detailed information
4364 * zero - the lock can't be canceled
4365 * other - ok to cancel
4367 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4369 check_res_locked(lock->l_resource);
4370 if (lock->l_granted_mode == LCK_GROUP ||
4371 lock->l_resource->lr_type != LDLM_EXTENT)
4374 /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4375 if (lock->l_granted_mode == LCK_PR ||
4376 lock->l_granted_mode == LCK_CR)
4382 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4388 rc = ptlrpcd_addref();
4392 rc = client_obd_setup(obd, len, buf);
4396 struct lprocfs_static_vars lvars = { 0 };
4397 struct client_obd *cli = &obd->u.cli;
4399 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4400 lprocfs_osc_init_vars(&lvars);
4401 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4402 lproc_osc_attach_seqstat(obd);
4403 ptlrpc_lprocfs_register_obd(obd);
4407 /* We need to allocate a few requests more, because
4408 brw_interpret tries to create new requests before freeing
4409 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4410 reserved, but I afraid that might be too much wasted RAM
4411 in fact, so 2 is just my guess and still should work. */
4412 cli->cl_import->imp_rq_pool =
4413 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4415 ptlrpc_add_rqs_to_pool);
4416 cli->cl_cache = cache_create(obd);
4417 if (!cli->cl_cache) {
4421 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4422 sema_init(&cli->cl_grant_sem, 1);
4424 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4430 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4436 case OBD_CLEANUP_EARLY: {
4437 struct obd_import *imp;
4438 imp = obd->u.cli.cl_import;
4439 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4440 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4441 ptlrpc_deactivate_import(imp);
4444 case OBD_CLEANUP_EXPORTS: {
4445 /* If we set up but never connected, the
4446 client import will not have been cleaned. */
4447 down_write(&obd->u.cli.cl_sem);
4448 if (obd->u.cli.cl_import) {
4449 struct obd_import *imp;
4450 imp = obd->u.cli.cl_import;
4451 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4453 ptlrpc_invalidate_import(imp);
4454 if (imp->imp_rq_pool) {
4455 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4456 imp->imp_rq_pool = NULL;
4458 class_destroy_import(imp);
4459 obd->u.cli.cl_import = NULL;
4461 up_write(&obd->u.cli.cl_sem);
4463 rc = obd_llog_finish(obd, 0);
4465 CERROR("failed to cleanup llogging subsystems\n");
4468 case OBD_CLEANUP_SELF_EXP:
4470 case OBD_CLEANUP_OBD:
4476 int osc_cleanup(struct obd_device *obd)
4481 ptlrpc_lprocfs_unregister_obd(obd);
4482 lprocfs_obd_cleanup(obd);
4484 /* free memory of osc quota cache */
4485 lquota_cleanup(quota_interface, obd);
4487 cache_destroy(obd->u.cli.cl_cache);
4488 rc = client_obd_cleanup(obd);
4494 static int osc_register_page_removal_cb(struct obd_device *obd,
4495 obd_page_removal_cb_t func,
4496 obd_pin_extent_cb pin_cb)
4500 /* this server - not need init */
4504 return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4508 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4509 obd_page_removal_cb_t func)
4512 return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4515 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4516 obd_lock_cancel_cb cb)
4519 LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4521 /* this server - not need init */
4525 obd->u.cli.cl_ext_lock_cancel_cb = cb;
4529 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4530 obd_lock_cancel_cb cb)
4534 if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4535 CERROR("Unregistering cancel cb %p, while only %p was "
4537 obd->u.cli.cl_ext_lock_cancel_cb);
4541 obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4545 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4547 struct lustre_cfg *lcfg = buf;
4548 struct lprocfs_static_vars lvars = { 0 };
4551 lprocfs_osc_init_vars(&lvars);
4553 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4557 struct obd_ops osc_obd_ops = {
4558 .o_owner = THIS_MODULE,
4559 .o_setup = osc_setup,
4560 .o_precleanup = osc_precleanup,
4561 .o_cleanup = osc_cleanup,
4562 .o_add_conn = client_import_add_conn,
4563 .o_del_conn = client_import_del_conn,
4564 .o_connect = client_connect_import,
4565 .o_reconnect = osc_reconnect,
4566 .o_disconnect = osc_disconnect,
4567 .o_statfs = osc_statfs,
4568 .o_statfs_async = osc_statfs_async,
4569 .o_packmd = osc_packmd,
4570 .o_unpackmd = osc_unpackmd,
4571 .o_precreate = osc_precreate,
4572 .o_create = osc_create,
4573 .o_create_async = osc_create_async,
4574 .o_destroy = osc_destroy,
4575 .o_getattr = osc_getattr,
4576 .o_getattr_async = osc_getattr_async,
4577 .o_setattr = osc_setattr,
4578 .o_setattr_async = osc_setattr_async,
4580 .o_brw_async = osc_brw_async,
4581 .o_prep_async_page = osc_prep_async_page,
4582 .o_get_lock = osc_get_lock,
4583 .o_queue_async_io = osc_queue_async_io,
4584 .o_set_async_flags = osc_set_async_flags,
4585 .o_queue_group_io = osc_queue_group_io,
4586 .o_trigger_group_io = osc_trigger_group_io,
4587 .o_teardown_async_page = osc_teardown_async_page,
4588 .o_punch = osc_punch,
4590 .o_enqueue = osc_enqueue,
4591 .o_match = osc_match,
4592 .o_change_cbdata = osc_change_cbdata,
4593 .o_cancel = osc_cancel,
4594 .o_cancel_unused = osc_cancel_unused,
4595 .o_join_lru = osc_join_lru,
4596 .o_iocontrol = osc_iocontrol,
4597 .o_get_info = osc_get_info,
4598 .o_set_info_async = osc_set_info_async,
4599 .o_import_event = osc_import_event,
4600 .o_llog_init = osc_llog_init,
4601 .o_llog_finish = osc_llog_finish,
4602 .o_process_config = osc_process_config,
4603 .o_register_page_removal_cb = osc_register_page_removal_cb,
4604 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4605 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4606 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4608 int __init osc_init(void)
4610 struct lprocfs_static_vars lvars = { 0 };
4614 lprocfs_osc_init_vars(&lvars);
4616 request_module("lquota");
4617 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4618 lquota_init(quota_interface);
4619 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4621 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4624 if (quota_interface)
4625 PORTAL_SYMBOL_PUT(osc_quota_interface);
4629 osc_mds_ost_orig_logops = llog_lvfs_ops;
4630 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4631 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4632 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4633 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4639 static void /*__exit*/ osc_exit(void)
4641 lquota_exit(quota_interface);
4642 if (quota_interface)
4643 PORTAL_SYMBOL_PUT(osc_quota_interface);
4645 class_unregister_type(LUSTRE_OSC_NAME);
4648 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4649 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4650 MODULE_LICENSE("GPL");
4652 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);