1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
78 atomic_t osc_resend_time;
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 LASSERT((*lsmp)->lsm_object_id);
161 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167 struct osc_async_args *aa, int rc)
169 struct ost_body *body;
175 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
176 lustre_swab_ost_body);
178 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
179 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
181 /* This should really be sent by the OST */
182 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
183 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
185 CERROR("can't unpack ost_body\n");
187 aa->aa_oi->oi_oa->o_valid = 0;
190 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
194 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
195 struct ptlrpc_request_set *set)
197 struct ptlrpc_request *req;
198 struct ost_body *body;
199 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
200 struct osc_async_args *aa;
203 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
204 OST_GETATTR, 2, size,NULL);
208 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
209 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
211 ptlrpc_req_set_repsize(req, 2, size);
212 req->rq_interpret_reply = osc_getattr_interpret;
214 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
215 aa = ptlrpc_req_async_args(req);
218 ptlrpc_set_add_req(set, req);
222 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
224 struct ptlrpc_request *req;
225 struct ost_body *body;
226 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
230 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
231 OST_GETATTR, 2, size, NULL);
235 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
236 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
238 ptlrpc_req_set_repsize(req, 2, size);
240 rc = ptlrpc_queue_wait(req);
242 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
246 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
247 lustre_swab_ost_body);
249 CERROR ("can't unpack ost_body\n");
250 GOTO (out, rc = -EPROTO);
253 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
254 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
256 /* This should really be sent by the OST */
257 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
258 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
262 ptlrpc_req_finished(req);
266 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
267 struct obd_trans_info *oti)
269 struct ptlrpc_request *req;
270 struct ost_body *body;
271 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
275 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
276 OST_SETATTR, 2, size, NULL);
280 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
281 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
283 ptlrpc_req_set_repsize(req, 2, size);
285 rc = ptlrpc_queue_wait(req);
289 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
290 lustre_swab_ost_body);
292 GOTO(out, rc = -EPROTO);
294 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
298 ptlrpc_req_finished(req);
302 static int osc_setattr_interpret(struct ptlrpc_request *req,
303 struct osc_async_args *aa, int rc)
305 struct ost_body *body;
311 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
312 lustre_swab_ost_body);
314 CERROR("can't unpack ost_body\n");
315 GOTO(out, rc = -EPROTO);
318 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
320 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
324 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
325 struct obd_trans_info *oti,
326 struct ptlrpc_request_set *rqset)
328 struct ptlrpc_request *req;
329 struct ost_body *body;
330 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
332 struct osc_async_args *aa;
335 if (osc_exp_is_2_0_server(exp)) {
339 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
340 OST_SETATTR, bufcount, size, NULL);
344 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
346 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
348 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
351 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
352 ptlrpc_req_set_repsize(req, 2, size);
353 /* do mds to ost setattr asynchronouly */
355 /* Do not wait for response. */
356 ptlrpcd_add_req(req);
358 req->rq_interpret_reply = osc_setattr_interpret;
360 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
361 aa = ptlrpc_req_async_args(req);
364 ptlrpc_set_add_req(rqset, req);
370 int osc_real_create(struct obd_export *exp, struct obdo *oa,
371 struct lov_stripe_md **ea, struct obd_trans_info *oti)
373 struct ptlrpc_request *req;
374 struct ost_body *body;
375 struct lov_stripe_md *lsm;
376 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
385 rc = obd_alloc_memmd(exp, &lsm);
390 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
391 OST_CREATE, 2, size, NULL);
393 GOTO(out, rc = -ENOMEM);
395 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
396 memcpy(&body->oa, oa, sizeof(body->oa));
398 ptlrpc_req_set_repsize(req, 2, size);
399 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400 oa->o_flags == OBD_FL_DELORPHAN) {
402 "delorphan from OST integration");
403 /* Don't resend the delorphan req */
404 req->rq_no_resend = req->rq_no_delay = 1;
407 rc = ptlrpc_queue_wait(req);
411 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
412 lustre_swab_ost_body);
414 CERROR ("can't unpack ost_body\n");
415 GOTO (out_req, rc = -EPROTO);
418 memcpy(oa, &body->oa, sizeof(*oa));
420 /* This should really be sent by the OST */
421 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
422 oa->o_valid |= OBD_MD_FLBLKSZ;
424 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
425 * have valid lsm_oinfo data structs, so don't go touching that.
426 * This needs to be fixed in a big way.
428 lsm->lsm_object_id = oa->o_id;
432 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
434 if (oa->o_valid & OBD_MD_FLCOOKIE) {
435 if (!oti->oti_logcookies)
436 oti_alloc_cookies(oti, 1);
437 *oti->oti_logcookies = oa->o_lcookie;
441 CDEBUG(D_HA, "transno: "LPD64"\n",
442 lustre_msg_get_transno(req->rq_repmsg));
444 ptlrpc_req_finished(req);
447 obd_free_memmd(exp, &lsm);
451 static int osc_punch_interpret(struct ptlrpc_request *req,
452 struct osc_async_args *aa, int rc)
454 struct ost_body *body;
460 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
461 lustre_swab_ost_body);
463 CERROR ("can't unpack ost_body\n");
464 GOTO(out, rc = -EPROTO);
467 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
469 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
473 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
474 struct obd_trans_info *oti,
475 struct ptlrpc_request_set *rqset)
477 struct ptlrpc_request *req;
478 struct osc_async_args *aa;
479 struct ost_body *body;
480 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
488 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
489 OST_PUNCH, 2, size, NULL);
493 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
494 ptlrpc_at_set_req_timeout(req);
496 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
497 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
499 /* overload the size and blocks fields in the oa with start/end */
500 body->oa.o_size = oinfo->oi_policy.l_extent.start;
501 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
502 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
504 ptlrpc_req_set_repsize(req, 2, size);
506 req->rq_interpret_reply = osc_punch_interpret;
507 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
508 aa = ptlrpc_req_async_args(req);
510 ptlrpc_set_add_req(rqset, req);
515 static int osc_sync_interpret(struct ptlrpc_request *req,
516 struct osc_async_args *aa, int rc)
518 struct ost_body *body;
524 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
525 lustre_swab_ost_body);
527 CERROR ("can't unpack ost_body\n");
528 GOTO(out, rc = -EPROTO);
531 *aa->aa_oi->oi_oa = body->oa;
533 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
537 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
538 obd_size start, obd_size end,
539 struct ptlrpc_request_set *set)
541 struct ptlrpc_request *req;
542 struct ost_body *body;
543 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
544 struct osc_async_args *aa;
552 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
553 OST_SYNC, 2, size, NULL);
557 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
558 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
560 /* overload the size and blocks fields in the oa with start/end */
561 body->oa.o_size = start;
562 body->oa.o_blocks = end;
563 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
565 ptlrpc_req_set_repsize(req, 2, size);
566 req->rq_interpret_reply = osc_sync_interpret;
568 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
569 aa = ptlrpc_req_async_args(req);
572 ptlrpc_set_add_req(set, req);
576 /* Find and cancel locally locks matched by @mode in the resource found by
577 * @objid. Found locks are added into @cancel list. Returns the amount of
578 * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580 struct list_head *cancels, ldlm_mode_t mode,
583 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584 struct ldlm_res_id res_id;
585 struct ldlm_resource *res;
589 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
590 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
594 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
595 lock_flags, 0, NULL);
596 ldlm_resource_putref(res);
600 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
603 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
605 atomic_dec(&cli->cl_destroy_in_flight);
606 cfs_waitq_signal(&cli->cl_destroy_waitq);
610 static int osc_can_send_destroy(struct client_obd *cli)
612 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
613 cli->cl_max_rpcs_in_flight) {
614 /* The destroy request can be sent */
617 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
618 cli->cl_max_rpcs_in_flight) {
620 * The counter has been modified between the two atomic
623 cfs_waitq_signal(&cli->cl_destroy_waitq);
628 /* Destroy requests can be async always on the client, and we don't even really
629 * care about the return code since the client cannot do anything at all about
631 * When the MDS is unlinking a filename, it saves the file objects into a
632 * recovery llog, and these object records are cancelled when the OST reports
633 * they were destroyed and sync'd to disk (i.e. transaction committed).
634 * If the client dies, or the OST is down when the object should be destroyed,
635 * the records are not cancelled, and when the OST reconnects to the MDS next,
636 * it will retrieve the llog unlink logs and then sends the log cancellation
637 * cookies to the MDS after committing destroy transactions. */
638 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
639 struct lov_stripe_md *ea, struct obd_trans_info *oti,
640 struct obd_export *md_export)
642 CFS_LIST_HEAD(cancels);
643 struct ptlrpc_request *req;
644 struct ost_body *body;
645 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
646 sizeof(struct ldlm_request) };
647 int count, bufcount = 2;
648 struct client_obd *cli = &exp->exp_obd->u.cli;
656 LASSERT(oa->o_id != 0);
658 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
659 LDLM_FL_DISCARD_DATA);
660 if (exp_connect_cancelset(exp))
662 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
663 size, REQ_REC_OFF + 1, 0, &cancels, count);
667 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
668 req->rq_interpret_reply = osc_destroy_interpret;
669 ptlrpc_at_set_req_timeout(req);
671 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
673 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
674 oa->o_lcookie = *oti->oti_logcookies;
677 memcpy(&body->oa, oa, sizeof(*oa));
678 ptlrpc_req_set_repsize(req, 2, size);
680 if (!osc_can_send_destroy(cli)) {
681 struct l_wait_info lwi = { 0 };
684 * Wait until the number of on-going destroy RPCs drops
685 * under max_rpc_in_flight
687 l_wait_event_exclusive(cli->cl_destroy_waitq,
688 osc_can_send_destroy(cli), &lwi);
691 /* Do not wait for response */
692 ptlrpcd_add_req(req);
696 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
699 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
701 LASSERT(!(oa->o_valid & bits));
704 client_obd_list_lock(&cli->cl_loi_list_lock);
705 oa->o_dirty = cli->cl_dirty;
706 if (cli->cl_dirty > cli->cl_dirty_max) {
707 CERROR("dirty %lu > dirty_max %lu\n",
708 cli->cl_dirty, cli->cl_dirty_max);
710 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
711 CERROR("dirty %d > system dirty_max %d\n",
712 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
714 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
715 CERROR("dirty %lu - dirty_max %lu too big???\n",
716 cli->cl_dirty, cli->cl_dirty_max);
719 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
720 (cli->cl_max_rpcs_in_flight + 1);
721 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
723 oa->o_grant = cli->cl_avail_grant;
724 oa->o_dropped = cli->cl_lost_grant;
725 cli->cl_lost_grant = 0;
726 client_obd_list_unlock(&cli->cl_loi_list_lock);
727 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
728 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
731 /* caller must hold loi_list_lock */
732 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
734 atomic_inc(&obd_dirty_pages);
735 cli->cl_dirty += CFS_PAGE_SIZE;
736 cli->cl_avail_grant -= CFS_PAGE_SIZE;
737 pga->flag |= OBD_BRW_FROM_GRANT;
738 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
739 CFS_PAGE_SIZE, pga, pga->pg);
740 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
741 cli->cl_avail_grant);
744 /* the companion to osc_consume_write_grant, called when a brw has completed.
745 * must be called with the loi lock held. */
746 static void osc_release_write_grant(struct client_obd *cli,
747 struct brw_page *pga, int sent)
749 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
752 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
757 pga->flag &= ~OBD_BRW_FROM_GRANT;
758 atomic_dec(&obd_dirty_pages);
759 cli->cl_dirty -= CFS_PAGE_SIZE;
761 cli->cl_lost_grant += CFS_PAGE_SIZE;
762 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
763 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
764 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
765 /* For short writes we shouldn't count parts of pages that
766 * span a whole block on the OST side, or our accounting goes
767 * wrong. Should match the code in filter_grant_check. */
768 int offset = pga->off & ~CFS_PAGE_MASK;
769 int count = pga->count + (offset & (blocksize - 1));
770 int end = (offset + pga->count) & (blocksize - 1);
772 count += blocksize - end;
774 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
775 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
776 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
777 cli->cl_avail_grant, cli->cl_dirty);
783 static unsigned long rpcs_in_flight(struct client_obd *cli)
785 return cli->cl_r_in_flight + cli->cl_w_in_flight;
788 /* caller must hold loi_list_lock */
789 void osc_wake_cache_waiters(struct client_obd *cli)
791 struct list_head *l, *tmp;
792 struct osc_cache_waiter *ocw;
795 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
796 /* if we can't dirty more, we must wait until some is written */
797 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
798 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
799 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
800 "osc max %ld, sys max %d\n", cli->cl_dirty,
801 cli->cl_dirty_max, obd_max_dirty_pages);
805 /* if still dirty cache but no grant wait for pending RPCs that
806 * may yet return us some grant before doing sync writes */
807 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
808 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
809 cli->cl_w_in_flight);
813 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
814 list_del_init(&ocw->ocw_entry);
815 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
816 /* no more RPCs in flight to return grant, do sync IO */
817 ocw->ocw_rc = -EDQUOT;
818 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
820 osc_consume_write_grant(cli,
821 &ocw->ocw_oap->oap_brw_page);
824 cfs_waitq_signal(&ocw->ocw_waitq);
830 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
832 client_obd_list_lock(&cli->cl_loi_list_lock);
833 cli->cl_avail_grant = ocd->ocd_grant;
834 client_obd_list_unlock(&cli->cl_loi_list_lock);
836 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
837 cli->cl_avail_grant, cli->cl_lost_grant);
838 LASSERT(cli->cl_avail_grant >= 0);
841 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
843 client_obd_list_lock(&cli->cl_loi_list_lock);
844 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
845 if (body->oa.o_valid & OBD_MD_FLGRANT)
846 cli->cl_avail_grant += body->oa.o_grant;
847 /* waiters are woken in brw_interpret */
848 client_obd_list_unlock(&cli->cl_loi_list_lock);
851 /* We assume that the reason this OSC got a short read is because it read
852 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
853 * via the LOV, and it _knows_ it's reading inside the file, it's just that
854 * this stripe never got written at or beyond this stripe offset yet. */
855 static void handle_short_read(int nob_read, obd_count page_count,
856 struct brw_page **pga)
861 /* skip bytes read OK */
862 while (nob_read > 0) {
863 LASSERT (page_count > 0);
865 if (pga[i]->count > nob_read) {
866 /* EOF inside this page */
867 ptr = cfs_kmap(pga[i]->pg) +
868 (pga[i]->off & ~CFS_PAGE_MASK);
869 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
870 cfs_kunmap(pga[i]->pg);
876 nob_read -= pga[i]->count;
881 /* zero remaining pages */
882 while (page_count-- > 0) {
883 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
884 memset(ptr, 0, pga[i]->count);
885 cfs_kunmap(pga[i]->pg);
890 static int check_write_rcs(struct ptlrpc_request *req,
891 int requested_nob, int niocount,
892 obd_count page_count, struct brw_page **pga)
896 /* return error if any niobuf was in error */
897 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
898 sizeof(*remote_rcs) * niocount, NULL);
899 if (remote_rcs == NULL) {
900 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
903 if (lustre_rep_need_swab(req))
904 for (i = 0; i < niocount; i++)
905 __swab32s(&remote_rcs[i]);
907 for (i = 0; i < niocount; i++) {
908 if (remote_rcs[i] < 0)
909 return(remote_rcs[i]);
911 if (remote_rcs[i] != 0) {
912 CERROR("rc[%d] invalid (%d) req %p\n",
913 i, remote_rcs[i], req);
918 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
919 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
920 req->rq_bulk->bd_nob_transferred, requested_nob);
927 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
929 if (p1->flag != p2->flag) {
930 unsigned mask = ~OBD_BRW_FROM_GRANT;
932 /* warn if we try to combine flags that we don't know to be
934 if ((p1->flag & mask) != (p2->flag & mask))
935 CERROR("is it ok to have flags 0x%x and 0x%x in the "
936 "same brw?\n", p1->flag, p2->flag);
940 return (p1->off + p1->count == p2->off);
943 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
944 struct brw_page **pga, int opc,
945 cksum_type_t cksum_type)
950 LASSERT (pg_count > 0);
951 cksum = init_checksum(cksum_type);
952 while (nob > 0 && pg_count > 0) {
953 unsigned char *ptr = cfs_kmap(pga[i]->pg);
954 int off = pga[i]->off & ~CFS_PAGE_MASK;
955 int count = pga[i]->count > nob ? nob : pga[i]->count;
957 /* corrupt the data before we compute the checksum, to
958 * simulate an OST->client data error */
959 if (i == 0 && opc == OST_READ &&
960 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
961 memcpy(ptr + off, "bad1", min(4, nob));
962 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
963 cfs_kunmap(pga[i]->pg);
964 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
967 nob -= pga[i]->count;
971 /* For sending we only compute the wrong checksum instead
972 * of corrupting the data so it is still correct on a redo */
973 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
979 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
980 struct lov_stripe_md *lsm, obd_count page_count,
981 struct brw_page **pga,
982 struct ptlrpc_request **reqp)
984 struct ptlrpc_request *req;
985 struct ptlrpc_bulk_desc *desc;
986 struct ost_body *body;
987 struct obd_ioobj *ioobj;
988 struct niobuf_remote *niobuf;
989 __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
990 int niocount, i, requested_nob, opc, rc;
991 struct ptlrpc_request_pool *pool;
992 struct osc_brw_async_args *aa;
993 struct brw_page *pg_prev;
996 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
997 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
999 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1000 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1002 for (niocount = i = 1; i < page_count; i++) {
1003 if (!can_merge_pages(pga[i - 1], pga[i]))
1007 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1008 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1010 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1015 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1016 ptlrpc_at_set_req_timeout(req);
1018 if (opc == OST_WRITE)
1019 desc = ptlrpc_prep_bulk_imp (req, page_count,
1020 BULK_GET_SOURCE, OST_BULK_PORTAL);
1022 desc = ptlrpc_prep_bulk_imp (req, page_count,
1023 BULK_PUT_SINK, OST_BULK_PORTAL);
1025 GOTO(out, rc = -ENOMEM);
1026 /* NB request now owns desc and will free it when it gets freed */
1028 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1029 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1030 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1031 niocount * sizeof(*niobuf));
1033 memcpy(&body->oa, oa, sizeof(*oa));
1035 obdo_to_ioobj(oa, ioobj);
1036 ioobj->ioo_bufcnt = niocount;
1038 LASSERT (page_count > 0);
1040 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1041 struct brw_page *pg = pga[i];
1043 LASSERT(pg->count > 0);
1044 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1045 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1046 pg->off, pg->count);
1048 LASSERTF(i == 0 || pg->off > pg_prev->off,
1049 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1050 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1052 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1053 pg_prev->pg, page_private(pg_prev->pg),
1054 pg_prev->pg->index, pg_prev->off);
1056 LASSERTF(i == 0 || pg->off > pg_prev->off,
1057 "i %d p_c %u\n", i, page_count);
1059 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1060 (pg->flag & OBD_BRW_SRVLOCK));
1062 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1064 requested_nob += pg->count;
1066 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1068 niobuf->len += pg->count;
1070 niobuf->offset = pg->off;
1071 niobuf->len = pg->count;
1072 niobuf->flags = pg->flag;
1077 LASSERTF((void *)(niobuf - niocount) ==
1078 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1079 niocount * sizeof(*niobuf)),
1080 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1081 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1082 (void *)(niobuf - niocount));
1084 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1086 /* size[REQ_REC_OFF] still sizeof (*body) */
1087 if (opc == OST_WRITE) {
1088 if (cli->cl_checksum) {
1089 /* store cl_cksum_type in a local variable since
1090 * it can be changed via lprocfs */
1091 cksum_type_t cksum_type = cli->cl_cksum_type;
1093 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1094 oa->o_flags = body->oa.o_flags = 0;
1095 body->oa.o_flags |= cksum_type_pack(cksum_type);
1096 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1097 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1101 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1103 /* save this in 'oa', too, for later checking */
1104 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1105 oa->o_flags |= cksum_type_pack(cksum_type);
1107 /* clear out the checksum flag, in case this is a
1108 * resend but cl_checksum is no longer set. b=11238 */
1109 oa->o_valid &= ~OBD_MD_FLCKSUM;
1111 oa->o_cksum = body->oa.o_cksum;
1112 /* 1 RC per niobuf */
1113 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1114 ptlrpc_req_set_repsize(req, 3, size);
1116 if (cli->cl_checksum) {
1117 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1118 body->oa.o_flags = 0;
1119 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1120 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1122 /* 1 RC for the whole I/O */
1123 ptlrpc_req_set_repsize(req, 2, size);
1126 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1127 aa = ptlrpc_req_async_args(req);
1129 aa->aa_requested_nob = requested_nob;
1130 aa->aa_nio_count = niocount;
1131 aa->aa_page_count = page_count;
1135 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1141 ptlrpc_req_finished (req);
1145 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1146 __u32 client_cksum, __u32 server_cksum, int nob,
1147 obd_count page_count, struct brw_page **pga,
1148 cksum_type_t client_cksum_type)
1152 cksum_type_t cksum_type;
1154 if (server_cksum == client_cksum) {
1155 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1159 if (oa->o_valid & OBD_MD_FLFLAGS)
1160 cksum_type = cksum_type_unpack(oa->o_flags);
1162 cksum_type = OBD_CKSUM_CRC32;
1164 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1167 if (cksum_type != client_cksum_type)
1168 msg = "the server did not use the checksum type specified in "
1169 "the original request - likely a protocol problem";
1170 else if (new_cksum == server_cksum)
1171 msg = "changed on the client after we checksummed it - "
1172 "likely false positive due to mmap IO (bug 11742)";
1173 else if (new_cksum == client_cksum)
1174 msg = "changed in transit before arrival at OST";
1176 msg = "changed in transit AND doesn't match the original - "
1177 "likely false positive due to mmap IO (bug 11742)";
1179 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1180 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1181 "["LPU64"-"LPU64"]\n",
1182 msg, libcfs_nid2str(peer->nid),
1183 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1184 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1187 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1189 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1190 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1191 "client csum now %x\n", client_cksum, client_cksum_type,
1192 server_cksum, cksum_type, new_cksum);
1197 /* Note rc enters this function as number of bytes transferred */
1198 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1200 struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1201 const lnet_process_id_t *peer =
1202 &req->rq_import->imp_connection->c_peer;
1203 struct client_obd *cli = aa->aa_cli;
1204 struct ost_body *body;
1205 __u32 client_cksum = 0;
1208 if (rc < 0 && rc != -EDQUOT)
1211 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1212 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1213 lustre_swab_ost_body);
1215 CERROR ("Can't unpack body\n");
1219 /* set/clear over quota flag for a uid/gid */
1220 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1221 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1222 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1223 body->oa.o_gid, body->oa.o_valid,
1229 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1230 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1232 osc_update_grant(cli, body);
1234 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1236 CERROR ("Unexpected +ve rc %d\n", rc);
1239 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1241 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1242 check_write_checksum(&body->oa, peer, client_cksum,
1243 body->oa.o_cksum, aa->aa_requested_nob,
1244 aa->aa_page_count, aa->aa_ppga,
1245 cksum_type_unpack(aa->aa_oa->o_flags)))
1248 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1249 aa->aa_page_count, aa->aa_ppga);
1253 /* The rest of this function executes only for OST_READs */
1254 if (rc > aa->aa_requested_nob) {
1255 CERROR("Unexpected rc %d (%d requested)\n", rc,
1256 aa->aa_requested_nob);
1260 if (rc != req->rq_bulk->bd_nob_transferred) {
1261 CERROR ("Unexpected rc %d (%d transferred)\n",
1262 rc, req->rq_bulk->bd_nob_transferred);
1266 if (rc < aa->aa_requested_nob)
1267 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1269 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1270 static int cksum_counter;
1271 __u32 server_cksum = body->oa.o_cksum;
1274 cksum_type_t cksum_type;
1276 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1277 cksum_type = cksum_type_unpack(body->oa.o_flags);
1279 cksum_type = OBD_CKSUM_CRC32;
1280 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1281 aa->aa_ppga, OST_READ,
1284 if (peer->nid == req->rq_bulk->bd_sender) {
1288 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1291 if (server_cksum == ~0 && rc > 0) {
1292 CERROR("Protocol error: server %s set the 'checksum' "
1293 "bit, but didn't send a checksum. Not fatal, "
1294 "but please notify on http://bugzilla.lustre.org/\n",
1295 libcfs_nid2str(peer->nid));
1296 } else if (server_cksum != client_cksum) {
1297 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1298 "%s%s%s inum "LPU64"/"LPU64" object "
1299 LPU64"/"LPU64" extent "
1300 "["LPU64"-"LPU64"]\n",
1301 req->rq_import->imp_obd->obd_name,
1302 libcfs_nid2str(peer->nid),
1304 body->oa.o_valid & OBD_MD_FLFID ?
1305 body->oa.o_fid : (__u64)0,
1306 body->oa.o_valid & OBD_MD_FLFID ?
1307 body->oa.o_generation :(__u64)0,
1309 body->oa.o_valid & OBD_MD_FLGROUP ?
1310 body->oa.o_gr : (__u64)0,
1311 aa->aa_ppga[0]->off,
1312 aa->aa_ppga[aa->aa_page_count-1]->off +
1313 aa->aa_ppga[aa->aa_page_count-1]->count -
1315 CERROR("client %x, server %x, cksum_type %x\n",
1316 client_cksum, server_cksum, cksum_type);
1318 aa->aa_oa->o_cksum = client_cksum;
1322 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1325 } else if (unlikely(client_cksum)) {
1326 static int cksum_missed;
1329 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1330 CERROR("Checksum %u requested from %s but not sent\n",
1331 cksum_missed, libcfs_nid2str(peer->nid));
1337 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1342 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1343 struct lov_stripe_md *lsm,
1344 obd_count page_count, struct brw_page **pga)
1346 struct ptlrpc_request *request;
1350 struct l_wait_info lwi;
1353 init_waitqueue_head(&waitq);
1356 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1357 page_count, pga, &request);
1361 rc = ptlrpc_queue_wait(request);
1363 if (rc == -ETIMEDOUT && request->rq_resend) {
1364 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1365 ptlrpc_req_finished(request);
1369 rc = osc_brw_fini_request(request, rc);
1371 ptlrpc_req_finished(request);
1372 if (osc_recoverable_error(rc)) {
1374 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1375 CERROR("too many resend retries, returning error\n");
1379 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1380 l_wait_event(waitq, 0, &lwi);
1387 int osc_brw_redo_request(struct ptlrpc_request *request,
1388 struct osc_brw_async_args *aa)
1390 struct ptlrpc_request *new_req;
1391 struct ptlrpc_request_set *set = request->rq_set;
1392 struct osc_brw_async_args *new_aa;
1393 struct osc_async_page *oap;
1397 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1398 CERROR("too many resend retries, returning error\n");
1402 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1404 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1405 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1406 aa->aa_cli, aa->aa_oa,
1407 NULL /* lsm unused by osc currently */,
1408 aa->aa_page_count, aa->aa_ppga, &new_req);
1412 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1414 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1415 if (oap->oap_request != NULL) {
1416 LASSERTF(request == oap->oap_request,
1417 "request %p != oap_request %p\n",
1418 request, oap->oap_request);
1419 if (oap->oap_interrupted) {
1420 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1421 ptlrpc_req_finished(new_req);
1426 /* New request takes over pga and oaps from old request.
1427 * Note that copying a list_head doesn't work, need to move it... */
1429 new_req->rq_interpret_reply = request->rq_interpret_reply;
1430 new_req->rq_async_args = request->rq_async_args;
1431 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1433 new_aa = ptlrpc_req_async_args(new_req);
1435 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1436 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1437 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1439 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1440 if (oap->oap_request) {
1441 ptlrpc_req_finished(oap->oap_request);
1442 oap->oap_request = ptlrpc_request_addref(new_req);
1446 /* use ptlrpc_set_add_req is safe because interpret functions work
1447 * in check_set context. only one way exist with access to request
1448 * from different thread got -EINTR - this way protected with
1449 * cl_loi_list_lock */
1450 ptlrpc_set_add_req(set, new_req);
1452 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1454 DEBUG_REQ(D_INFO, new_req, "new request");
1458 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1459 struct lov_stripe_md *lsm, obd_count page_count,
1460 struct brw_page **pga, struct ptlrpc_request_set *set)
1462 struct ptlrpc_request *request;
1463 struct client_obd *cli = &exp->exp_obd->u.cli;
1465 struct osc_brw_async_args *aa;
1468 /* Consume write credits even if doing a sync write -
1469 * otherwise we may run out of space on OST due to grant. */
1470 if (cmd == OBD_BRW_WRITE) {
1471 client_obd_list_lock(&cli->cl_loi_list_lock);
1472 for (i = 0; i < page_count; i++) {
1473 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1474 osc_consume_write_grant(cli, pga[i]);
1476 client_obd_list_unlock(&cli->cl_loi_list_lock);
1479 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1480 page_count, pga, &request);
1482 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1483 aa = ptlrpc_req_async_args(request);
1484 if (cmd == OBD_BRW_READ) {
1485 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1486 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1488 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1489 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1490 cli->cl_w_in_flight);
1492 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1494 LASSERT(list_empty(&aa->aa_oaps));
1497 request->rq_interpret_reply = brw_interpret;
1498 ptlrpc_set_add_req(set, request);
1499 client_obd_list_lock(&cli->cl_loi_list_lock);
1500 if (cmd == OBD_BRW_READ)
1501 cli->cl_r_in_flight++;
1503 cli->cl_w_in_flight++;
1504 client_obd_list_unlock(&cli->cl_loi_list_lock);
1505 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1506 } else if (cmd == OBD_BRW_WRITE) {
1507 client_obd_list_lock(&cli->cl_loi_list_lock);
1508 for (i = 0; i < page_count; i++)
1509 osc_release_write_grant(cli, pga[i], 0);
1510 osc_wake_cache_waiters(cli);
1511 client_obd_list_unlock(&cli->cl_loi_list_lock);
1518 * ugh, we want disk allocation on the target to happen in offset order. we'll
1519 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1520 * fine for our small page arrays and doesn't require allocation. its an
1521 * insertion sort that swaps elements that are strides apart, shrinking the
1522 * stride down until its '1' and the array is sorted.
1524 static void sort_brw_pages(struct brw_page **array, int num)
1527 struct brw_page *tmp;
1531 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1536 for (i = stride ; i < num ; i++) {
1539 while (j >= stride && array[j-stride]->off > tmp->off) {
1540 array[j] = array[j - stride];
1545 } while (stride > 1);
1548 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1554 LASSERT (pages > 0);
1555 offset = pg[i]->off & (~CFS_PAGE_MASK);
1559 if (pages == 0) /* that's all */
1562 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1563 return count; /* doesn't end on page boundary */
1566 offset = pg[i]->off & (~CFS_PAGE_MASK);
1567 if (offset != 0) /* doesn't start on page boundary */
1574 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1576 struct brw_page **ppga;
1579 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1583 for (i = 0; i < count; i++)
1588 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1590 LASSERT(ppga != NULL);
1591 OBD_FREE(ppga, sizeof(*ppga) * count);
1594 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1595 obd_count page_count, struct brw_page *pga,
1596 struct obd_trans_info *oti)
1598 struct obdo *saved_oa = NULL;
1599 struct brw_page **ppga, **orig;
1600 struct obd_import *imp = class_exp2cliimp(exp);
1601 struct client_obd *cli = &imp->imp_obd->u.cli;
1602 int rc, page_count_orig;
1605 if (cmd & OBD_BRW_CHECK) {
1606 /* The caller just wants to know if there's a chance that this
1607 * I/O can succeed */
1609 if (imp == NULL || imp->imp_invalid)
1614 /* test_brw with a failed create can trip this, maybe others. */
1615 LASSERT(cli->cl_max_pages_per_rpc);
1619 orig = ppga = osc_build_ppga(pga, page_count);
1622 page_count_orig = page_count;
1624 sort_brw_pages(ppga, page_count);
1625 while (page_count) {
1626 obd_count pages_per_brw;
1628 if (page_count > cli->cl_max_pages_per_rpc)
1629 pages_per_brw = cli->cl_max_pages_per_rpc;
1631 pages_per_brw = page_count;
1633 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1635 if (saved_oa != NULL) {
1636 /* restore previously saved oa */
1637 *oinfo->oi_oa = *saved_oa;
1638 } else if (page_count > pages_per_brw) {
1639 /* save a copy of oa (brw will clobber it) */
1640 OBDO_ALLOC(saved_oa);
1641 if (saved_oa == NULL)
1642 GOTO(out, rc = -ENOMEM);
1643 *saved_oa = *oinfo->oi_oa;
1646 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1647 pages_per_brw, ppga);
1652 page_count -= pages_per_brw;
1653 ppga += pages_per_brw;
1657 osc_release_ppga(orig, page_count_orig);
1659 if (saved_oa != NULL)
1660 OBDO_FREE(saved_oa);
1665 static int osc_brw_async(int cmd, struct obd_export *exp,
1666 struct obd_info *oinfo, obd_count page_count,
1667 struct brw_page *pga, struct obd_trans_info *oti,
1668 struct ptlrpc_request_set *set)
1670 struct brw_page **ppga, **orig;
1671 int page_count_orig;
1675 if (cmd & OBD_BRW_CHECK) {
1676 /* The caller just wants to know if there's a chance that this
1677 * I/O can succeed */
1678 struct obd_import *imp = class_exp2cliimp(exp);
1680 if (imp == NULL || imp->imp_invalid)
1685 orig = ppga = osc_build_ppga(pga, page_count);
1688 page_count_orig = page_count;
1690 sort_brw_pages(ppga, page_count);
1691 while (page_count) {
1692 struct brw_page **copy;
1693 obd_count pages_per_brw;
1695 pages_per_brw = min_t(obd_count, page_count,
1696 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1698 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1700 /* use ppga only if single RPC is going to fly */
1701 if (pages_per_brw != page_count_orig || ppga != orig) {
1702 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1704 GOTO(out, rc = -ENOMEM);
1705 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1709 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1710 pages_per_brw, copy, set);
1714 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1719 /* we passed it to async_internal() which is
1720 * now responsible for releasing memory */
1724 page_count -= pages_per_brw;
1725 ppga += pages_per_brw;
1729 osc_release_ppga(orig, page_count_orig);
1733 static void osc_check_rpcs(struct client_obd *cli);
1735 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1736 * the dirty accounting. Writeback completes or truncate happens before
1737 * writing starts. Must be called with the loi lock held. */
1738 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1741 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1744 /* This maintains the lists of pending pages to read/write for a given object
1745 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1746 * to quickly find objects that are ready to send an RPC. */
1747 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1753 if (lop->lop_num_pending == 0)
1756 /* if we have an invalid import we want to drain the queued pages
1757 * by forcing them through rpcs that immediately fail and complete
1758 * the pages. recovery relies on this to empty the queued pages
1759 * before canceling the locks and evicting down the llite pages */
1760 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1763 /* stream rpcs in queue order as long as as there is an urgent page
1764 * queued. this is our cheap solution for good batching in the case
1765 * where writepage marks some random page in the middle of the file
1766 * as urgent because of, say, memory pressure */
1767 if (!list_empty(&lop->lop_urgent)) {
1768 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1772 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1773 optimal = cli->cl_max_pages_per_rpc;
1774 if (cmd & OBD_BRW_WRITE) {
1775 /* trigger a write rpc stream as long as there are dirtiers
1776 * waiting for space. as they're waiting, they're not going to
1777 * create more pages to coallesce with what's waiting.. */
1778 if (!list_empty(&cli->cl_cache_waiters)) {
1779 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1783 /* +16 to avoid triggering rpcs that would want to include pages
1784 * that are being queued but which can't be made ready until
1785 * the queuer finishes with the page. this is a wart for
1786 * llite::commit_write() */
1789 if (lop->lop_num_pending >= optimal)
1795 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1797 struct osc_async_page *oap;
1800 if (list_empty(&lop->lop_urgent))
1803 oap = list_entry(lop->lop_urgent.next,
1804 struct osc_async_page, oap_urgent_item);
1806 if (oap->oap_async_flags & ASYNC_HP) {
1807 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1814 static void on_list(struct list_head *item, struct list_head *list,
1817 if (list_empty(item) && should_be_on)
1818 list_add_tail(item, list);
1819 else if (!list_empty(item) && !should_be_on)
1820 list_del_init(item);
1823 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1824 * can find pages to build into rpcs quickly */
1825 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1827 if (lop_makes_hprpc(&loi->loi_write_lop) ||
1828 lop_makes_hprpc(&loi->loi_read_lop)) {
1830 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1831 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1833 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1834 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1835 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1836 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1839 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1840 loi->loi_write_lop.lop_num_pending);
1842 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1843 loi->loi_read_lop.lop_num_pending);
1846 static void lop_update_pending(struct client_obd *cli,
1847 struct loi_oap_pages *lop, int cmd, int delta)
1849 lop->lop_num_pending += delta;
1850 if (cmd & OBD_BRW_WRITE)
1851 cli->cl_pending_w_pages += delta;
1853 cli->cl_pending_r_pages += delta;
1856 /* this is called when a sync waiter receives an interruption. Its job is to
1857 * get the caller woken as soon as possible. If its page hasn't been put in an
1858 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1859 * desiring interruption which will forcefully complete the rpc once the rpc
1861 static void osc_occ_interrupted(struct oig_callback_context *occ)
1863 struct osc_async_page *oap;
1864 struct loi_oap_pages *lop;
1865 struct lov_oinfo *loi;
1868 /* XXX member_of() */
1869 oap = list_entry(occ, struct osc_async_page, oap_occ);
1871 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1873 oap->oap_interrupted = 1;
1875 /* ok, it's been put in an rpc. only one oap gets a request reference */
1876 if (oap->oap_request != NULL) {
1877 ptlrpc_mark_interrupted(oap->oap_request);
1878 ptlrpcd_wake(oap->oap_request);
1882 /* we don't get interruption callbacks until osc_trigger_group_io()
1883 * has been called and put the sync oaps in the pending/urgent lists.*/
1884 if (!list_empty(&oap->oap_pending_item)) {
1885 list_del_init(&oap->oap_pending_item);
1886 list_del_init(&oap->oap_urgent_item);
1889 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1890 &loi->loi_write_lop : &loi->loi_read_lop;
1891 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1892 loi_list_maint(oap->oap_cli, oap->oap_loi);
1894 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1895 oap->oap_oig = NULL;
1899 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1902 /* this is trying to propogate async writeback errors back up to the
1903 * application. As an async write fails we record the error code for later if
1904 * the app does an fsync. As long as errors persist we force future rpcs to be
1905 * sync so that the app can get a sync error and break the cycle of queueing
1906 * pages for which writeback will fail. */
1907 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1914 ar->ar_force_sync = 1;
1915 ar->ar_min_xid = ptlrpc_sample_next_xid();
1920 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1921 ar->ar_force_sync = 0;
1924 static void osc_oap_to_pending(struct osc_async_page *oap)
1926 struct loi_oap_pages *lop;
1928 if (oap->oap_cmd & OBD_BRW_WRITE)
1929 lop = &oap->oap_loi->loi_write_lop;
1931 lop = &oap->oap_loi->loi_read_lop;
1933 if (oap->oap_async_flags & ASYNC_HP)
1934 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1935 else if (oap->oap_async_flags & ASYNC_URGENT)
1936 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
1937 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1938 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1941 /* this must be called holding the loi list lock to give coverage to exit_cache,
1942 * async_flag maintenance, and oap_request */
1943 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1944 struct osc_async_page *oap, int sent, int rc)
1949 if (oap->oap_request != NULL) {
1950 xid = ptlrpc_req_xid(oap->oap_request);
1951 ptlrpc_req_finished(oap->oap_request);
1952 oap->oap_request = NULL;
1955 oap->oap_async_flags = 0;
1956 oap->oap_interrupted = 0;
1958 if (oap->oap_cmd & OBD_BRW_WRITE) {
1959 osc_process_ar(&cli->cl_ar, xid, rc);
1960 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1963 if (rc == 0 && oa != NULL) {
1964 if (oa->o_valid & OBD_MD_FLBLOCKS)
1965 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1966 if (oa->o_valid & OBD_MD_FLMTIME)
1967 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1968 if (oa->o_valid & OBD_MD_FLATIME)
1969 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1970 if (oa->o_valid & OBD_MD_FLCTIME)
1971 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1975 osc_exit_cache(cli, oap, sent);
1976 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1977 oap->oap_oig = NULL;
1982 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1983 oap->oap_cmd, oa, rc);
1985 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1986 * I/O on the page could start, but OSC calls it under lock
1987 * and thus we can add oap back to pending safely */
1989 /* upper layer wants to leave the page on pending queue */
1990 osc_oap_to_pending(oap);
1992 osc_exit_cache(cli, oap, sent);
1996 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1998 struct osc_brw_async_args *aa = data;
1999 struct client_obd *cli;
2002 rc = osc_brw_fini_request(request, rc);
2003 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2005 if (osc_recoverable_error(rc)) {
2006 rc = osc_brw_redo_request(request, aa);
2012 client_obd_list_lock(&cli->cl_loi_list_lock);
2013 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2014 * is called so we know whether to go to sync BRWs or wait for more
2015 * RPCs to complete */
2016 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2017 cli->cl_w_in_flight--;
2019 cli->cl_r_in_flight--;
2021 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2022 struct osc_async_page *oap, *tmp;
2023 /* the caller may re-use the oap after the completion call so
2024 * we need to clean it up a little */
2025 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2026 list_del_init(&oap->oap_rpc_item);
2027 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2029 OBDO_FREE(aa->aa_oa);
2030 } else { /* from async_internal() */
2032 for (i = 0; i < aa->aa_page_count; i++)
2033 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2035 osc_wake_cache_waiters(cli);
2036 osc_check_rpcs(cli);
2037 client_obd_list_unlock(&cli->cl_loi_list_lock);
2039 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2043 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2044 struct list_head *rpc_list,
2045 int page_count, int cmd)
2047 struct ptlrpc_request *req;
2048 struct brw_page **pga = NULL;
2049 struct osc_brw_async_args *aa;
2050 struct obdo *oa = NULL;
2051 struct obd_async_page_ops *ops = NULL;
2052 void *caller_data = NULL;
2053 struct osc_async_page *oap;
2054 struct ldlm_lock *lock = NULL;
2059 LASSERT(!list_empty(rpc_list));
2061 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2063 RETURN(ERR_PTR(-ENOMEM));
2067 GOTO(out, req = ERR_PTR(-ENOMEM));
2070 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2072 ops = oap->oap_caller_ops;
2073 caller_data = oap->oap_caller_data;
2074 lock = oap->oap_ldlm_lock;
2076 pga[i] = &oap->oap_brw_page;
2077 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2078 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2079 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2083 /* always get the data for the obdo for the rpc */
2084 LASSERT(ops != NULL);
2085 ops->ap_fill_obdo(caller_data, cmd, oa);
2087 oa->o_handle = lock->l_remote_handle;
2088 oa->o_valid |= OBD_MD_FLHANDLE;
2091 sort_brw_pages(pga, page_count);
2092 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
2094 CERROR("prep_req failed: %d\n", rc);
2095 GOTO(out, req = ERR_PTR(rc));
2097 oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2098 sizeof(struct ost_body)))->oa;
2100 /* Need to update the timestamps after the request is built in case
2101 * we race with setattr (locally or in queue at OST). If OST gets
2102 * later setattr before earlier BRW (as determined by the request xid),
2103 * the OST will not use BRW timestamps. Sadly, there is no obvious
2104 * way to do this in a single call. bug 10150 */
2105 if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2106 /* in case of lockless read/write do not use inode's
2107 * timestamps because concurrent stat might fill the
2108 * inode with out-of-date times, send current
2110 if (cmd & OBD_BRW_WRITE) {
2111 oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2112 oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2113 valid = OBD_MD_FLATIME;
2115 oa->o_atime = LTIME_S(CURRENT_TIME);
2116 oa->o_valid |= OBD_MD_FLATIME;
2117 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2120 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2122 ops->ap_update_obdo(caller_data, cmd, oa, valid);
2124 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2125 aa = ptlrpc_req_async_args(req);
2126 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2127 list_splice(rpc_list, &aa->aa_oaps);
2128 CFS_INIT_LIST_HEAD(rpc_list);
2135 OBD_FREE(pga, sizeof(*pga) * page_count);
2140 /* the loi lock is held across this function but it's allowed to release
2141 * and reacquire it during its work */
2143 * prepare pages for ASYNC io and put pages in send queue.
2147 * \param cmd - OBD_BRW_* macroses
2148 * \param lop - pending pages
2150 * \return zero if pages successfully add to send queue.
2151 * \return not zere if error occurring.
2153 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2154 int cmd, struct loi_oap_pages *lop)
2156 struct ptlrpc_request *req;
2157 obd_count page_count = 0;
2158 struct osc_async_page *oap = NULL, *tmp;
2159 struct osc_brw_async_args *aa;
2160 struct obd_async_page_ops *ops;
2161 CFS_LIST_HEAD(rpc_list);
2162 unsigned int ending_offset;
2163 unsigned starting_offset = 0;
2167 /* If there are HP OAPs we need to handle at least 1 of them,
2168 * move it the beginning of the pending list for that. */
2169 if (!list_empty(&lop->lop_urgent)) {
2170 oap = list_entry(lop->lop_urgent.next,
2171 struct osc_async_page, oap_urgent_item);
2172 if (oap->oap_async_flags & ASYNC_HP)
2173 list_move(&oap->oap_pending_item, &lop->lop_pending);
2176 /* first we find the pages we're allowed to work with */
2177 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2178 ops = oap->oap_caller_ops;
2180 LASSERT(oap->oap_magic == OAP_MAGIC);
2182 if (page_count != 0 &&
2183 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2184 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2185 " oap %p, page %p, srvlock %u\n",
2186 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2189 /* in llite being 'ready' equates to the page being locked
2190 * until completion unlocks it. commit_write submits a page
2191 * as not ready because its unlock will happen unconditionally
2192 * as the call returns. if we race with commit_write giving
2193 * us that page we dont' want to create a hole in the page
2194 * stream, so we stop and leave the rpc to be fired by
2195 * another dirtier or kupdated interval (the not ready page
2196 * will still be on the dirty list). we could call in
2197 * at the end of ll_file_write to process the queue again. */
2198 if (!(oap->oap_async_flags & ASYNC_READY)) {
2199 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2201 CDEBUG(D_INODE, "oap %p page %p returned %d "
2202 "instead of ready\n", oap,
2206 /* llite is telling us that the page is still
2207 * in commit_write and that we should try
2208 * and put it in an rpc again later. we
2209 * break out of the loop so we don't create
2210 * a hole in the sequence of pages in the rpc
2215 /* the io isn't needed.. tell the checks
2216 * below to complete the rpc with EINTR */
2217 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2218 oap->oap_count = -EINTR;
2221 oap->oap_async_flags |= ASYNC_READY;
2224 LASSERTF(0, "oap %p page %p returned %d "
2225 "from make_ready\n", oap,
2233 * Page submitted for IO has to be locked. Either by
2234 * ->ap_make_ready() or by higher layers.
2236 #if defined(__KERNEL__) && defined(__linux__)
2237 if(!(PageLocked(oap->oap_page) &&
2238 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2239 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2240 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2244 /* If there is a gap at the start of this page, it can't merge
2245 * with any previous page, so we'll hand the network a
2246 * "fragmented" page array that it can't transfer in 1 RDMA */
2247 if (page_count != 0 && oap->oap_page_off != 0)
2250 /* take the page out of our book-keeping */
2251 list_del_init(&oap->oap_pending_item);
2252 lop_update_pending(cli, lop, cmd, -1);
2253 list_del_init(&oap->oap_urgent_item);
2255 if (page_count == 0)
2256 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2257 (PTLRPC_MAX_BRW_SIZE - 1);
2259 /* ask the caller for the size of the io as the rpc leaves. */
2260 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2262 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2263 if (oap->oap_count <= 0) {
2264 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2266 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2270 /* now put the page back in our accounting */
2271 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2272 if (page_count == 0)
2273 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2274 if (++page_count >= cli->cl_max_pages_per_rpc)
2277 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2278 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2279 * have the same alignment as the initial writes that allocated
2280 * extents on the server. */
2281 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2282 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2283 if (ending_offset == 0)
2286 /* If there is a gap at the end of this page, it can't merge
2287 * with any subsequent pages, so we'll hand the network a
2288 * "fragmented" page array that it can't transfer in 1 RDMA */
2289 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2293 osc_wake_cache_waiters(cli);
2295 if (page_count == 0)
2298 loi_list_maint(cli, loi);
2300 client_obd_list_unlock(&cli->cl_loi_list_lock);
2302 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2304 /* this should happen rarely and is pretty bad, it makes the
2305 * pending list not follow the dirty order */
2306 client_obd_list_lock(&cli->cl_loi_list_lock);
2307 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2308 list_del_init(&oap->oap_rpc_item);
2310 /* queued sync pages can be torn down while the pages
2311 * were between the pending list and the rpc */
2312 if (oap->oap_interrupted) {
2313 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2314 osc_ap_completion(cli, NULL, oap, 0,
2318 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2320 loi_list_maint(cli, loi);
2321 RETURN(PTR_ERR(req));
2324 aa = ptlrpc_req_async_args(req);
2325 if (cmd == OBD_BRW_READ) {
2326 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2327 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2328 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2329 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2331 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2332 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2333 cli->cl_w_in_flight);
2334 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2335 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2337 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2339 client_obd_list_lock(&cli->cl_loi_list_lock);
2341 if (cmd == OBD_BRW_READ)
2342 cli->cl_r_in_flight++;
2344 cli->cl_w_in_flight++;
2346 /* queued sync pages can be torn down while the pages
2347 * were between the pending list and the rpc */
2349 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2350 /* only one oap gets a request reference */
2353 if (oap->oap_interrupted && !req->rq_intr) {
2354 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2356 ptlrpc_mark_interrupted(req);
2360 tmp->oap_request = ptlrpc_request_addref(req);
2362 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2363 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2365 req->rq_interpret_reply = brw_interpret;
2366 ptlrpcd_add_req(req);
2370 #define LOI_DEBUG(LOI, STR, args...) \
2371 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2372 !list_empty(&(LOI)->loi_ready_item) || \
2373 !list_empty(&(LOI)->loi_hp_ready_item), \
2374 (LOI)->loi_write_lop.lop_num_pending, \
2375 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2376 (LOI)->loi_read_lop.lop_num_pending, \
2377 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2380 /* This is called by osc_check_rpcs() to find which objects have pages that
2381 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2382 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2385 /* First return objects that have blocked locks so that they
2386 * will be flushed quickly and other clients can get the lock,
2387 * then objects which have pages ready to be stuffed into RPCs */
2388 if (!list_empty(&cli->cl_loi_hp_ready_list))
2389 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2390 struct lov_oinfo, loi_hp_ready_item));
2391 if (!list_empty(&cli->cl_loi_ready_list))
2392 RETURN(list_entry(cli->cl_loi_ready_list.next,
2393 struct lov_oinfo, loi_ready_item));
2395 /* then if we have cache waiters, return all objects with queued
2396 * writes. This is especially important when many small files
2397 * have filled up the cache and not been fired into rpcs because
2398 * they don't pass the nr_pending/object threshhold */
2399 if (!list_empty(&cli->cl_cache_waiters) &&
2400 !list_empty(&cli->cl_loi_write_list))
2401 RETURN(list_entry(cli->cl_loi_write_list.next,
2402 struct lov_oinfo, loi_write_item));
2404 /* then return all queued objects when we have an invalid import
2405 * so that they get flushed */
2406 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2407 if (!list_empty(&cli->cl_loi_write_list))
2408 RETURN(list_entry(cli->cl_loi_write_list.next,
2409 struct lov_oinfo, loi_write_item));
2410 if (!list_empty(&cli->cl_loi_read_list))
2411 RETURN(list_entry(cli->cl_loi_read_list.next,
2412 struct lov_oinfo, loi_read_item));
2417 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2419 struct osc_async_page *oap;
2422 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2423 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2424 struct osc_async_page, oap_urgent_item);
2425 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2428 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2429 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2430 struct osc_async_page, oap_urgent_item);
2431 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2434 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2437 /* called with the loi list lock held */
2438 static void osc_check_rpcs(struct client_obd *cli)
2440 struct lov_oinfo *loi;
2441 int rc = 0, race_counter = 0;
2444 while ((loi = osc_next_loi(cli)) != NULL) {
2445 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2447 if (osc_max_rpc_in_flight(cli, loi))
2450 /* attempt some read/write balancing by alternating between
2451 * reads and writes in an object. The makes_rpc checks here
2452 * would be redundant if we were getting read/write work items
2453 * instead of objects. we don't want send_oap_rpc to drain a
2454 * partial read pending queue when we're given this object to
2455 * do io on writes while there are cache waiters */
2456 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2457 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2458 &loi->loi_write_lop);
2466 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2467 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2468 &loi->loi_read_lop);
2477 /* attempt some inter-object balancing by issueing rpcs
2478 * for each object in turn */
2479 if (!list_empty(&loi->loi_hp_ready_item))
2480 list_del_init(&loi->loi_hp_ready_item);
2481 if (!list_empty(&loi->loi_ready_item))
2482 list_del_init(&loi->loi_ready_item);
2483 if (!list_empty(&loi->loi_write_item))
2484 list_del_init(&loi->loi_write_item);
2485 if (!list_empty(&loi->loi_read_item))
2486 list_del_init(&loi->loi_read_item);
2488 loi_list_maint(cli, loi);
2490 /* send_oap_rpc fails with 0 when make_ready tells it to
2491 * back off. llite's make_ready does this when it tries
2492 * to lock a page queued for write that is already locked.
2493 * we want to try sending rpcs from many objects, but we
2494 * don't want to spin failing with 0. */
2495 if (race_counter == 10)
2501 /* we're trying to queue a page in the osc so we're subject to the
2502 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2503 * If the osc's queued pages are already at that limit, then we want to sleep
2504 * until there is space in the osc's queue for us. We also may be waiting for
2505 * write credits from the OST if there are RPCs in flight that may return some
2506 * before we fall back to sync writes.
2508 * We need this know our allocation was granted in the presence of signals */
2509 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2513 client_obd_list_lock(&cli->cl_loi_list_lock);
2514 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2515 client_obd_list_unlock(&cli->cl_loi_list_lock);
2519 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2520 * grant or cache space. */
2521 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2522 struct osc_async_page *oap)
2524 struct osc_cache_waiter ocw;
2525 struct l_wait_info lwi = { 0 };
2528 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2529 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2530 cli->cl_dirty_max, obd_max_dirty_pages,
2531 cli->cl_lost_grant, cli->cl_avail_grant);
2533 /* force the caller to try sync io. this can jump the list
2534 * of queued writes and create a discontiguous rpc stream */
2535 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2536 loi->loi_ar.ar_force_sync)
2539 /* Hopefully normal case - cache space and write credits available */
2540 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2541 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2542 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2543 /* account for ourselves */
2544 osc_consume_write_grant(cli, &oap->oap_brw_page);
2548 /* Make sure that there are write rpcs in flight to wait for. This
2549 * is a little silly as this object may not have any pending but
2550 * other objects sure might. */
2551 if (cli->cl_w_in_flight) {
2552 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2553 cfs_waitq_init(&ocw.ocw_waitq);
2557 loi_list_maint(cli, loi);
2558 osc_check_rpcs(cli);
2559 client_obd_list_unlock(&cli->cl_loi_list_lock);
2561 CDEBUG(D_CACHE, "sleeping for cache space\n");
2562 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2564 client_obd_list_lock(&cli->cl_loi_list_lock);
2565 if (!list_empty(&ocw.ocw_entry)) {
2566 list_del(&ocw.ocw_entry);
2575 static int osc_reget_short_lock(struct obd_export *exp,
2576 struct lov_stripe_md *lsm,
2578 obd_off start, obd_off end,
2581 struct osc_async_page *oap = *res;
2586 spin_lock(&oap->oap_lock);
2587 rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2588 start, end, cookie);
2589 spin_unlock(&oap->oap_lock);
2594 static int osc_release_short_lock(struct obd_export *exp,
2595 struct lov_stripe_md *lsm, obd_off end,
2596 void *cookie, int rw)
2599 ldlm_lock_fast_release(cookie, rw);
2600 /* no error could have happened at this layer */
2604 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2605 struct lov_oinfo *loi, cfs_page_t *page,
2606 obd_off offset, struct obd_async_page_ops *ops,
2607 void *data, void **res, int nocache,
2608 struct lustre_handle *lockh)
2610 struct osc_async_page *oap;
2611 struct ldlm_res_id oid = {{0}};
2617 return size_round(sizeof(*oap));
2620 oap->oap_magic = OAP_MAGIC;
2621 oap->oap_cli = &exp->exp_obd->u.cli;
2624 oap->oap_caller_ops = ops;
2625 oap->oap_caller_data = data;
2627 oap->oap_page = page;
2628 oap->oap_obj_off = offset;
2630 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2631 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2632 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2633 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2635 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2637 spin_lock_init(&oap->oap_lock);
2639 /* If the page was marked as notcacheable - don't add to any locks */
2641 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2642 /* This is the only place where we can call cache_add_extent
2643 without oap_lock, because this page is locked now, and
2644 the lock we are adding it to is referenced, so cannot lose
2645 any pages either. */
2646 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2651 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2655 struct osc_async_page *oap_from_cookie(void *cookie)
2657 struct osc_async_page *oap = cookie;
2658 if (oap->oap_magic != OAP_MAGIC)
2659 return ERR_PTR(-EINVAL);
2663 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2664 struct lov_oinfo *loi, void *cookie,
2665 int cmd, obd_off off, int count,
2666 obd_flag brw_flags, enum async_flags async_flags)
2668 struct client_obd *cli = &exp->exp_obd->u.cli;
2669 struct osc_async_page *oap;
2673 oap = oap_from_cookie(cookie);
2675 RETURN(PTR_ERR(oap));
2677 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2680 if (!list_empty(&oap->oap_pending_item) ||
2681 !list_empty(&oap->oap_urgent_item) ||
2682 !list_empty(&oap->oap_rpc_item))
2685 /* check if the file's owner/group is over quota */
2686 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2687 struct obd_async_page_ops *ops;
2694 ops = oap->oap_caller_ops;
2695 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2696 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2706 loi = lsm->lsm_oinfo[0];
2708 client_obd_list_lock(&cli->cl_loi_list_lock);
2711 oap->oap_page_off = off;
2712 oap->oap_count = count;
2713 oap->oap_brw_flags = brw_flags;
2714 oap->oap_async_flags = async_flags;
2716 if (cmd & OBD_BRW_WRITE) {
2717 rc = osc_enter_cache(cli, loi, oap);
2719 client_obd_list_unlock(&cli->cl_loi_list_lock);
2724 osc_oap_to_pending(oap);
2725 loi_list_maint(cli, loi);
2727 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2730 osc_check_rpcs(cli);
2731 client_obd_list_unlock(&cli->cl_loi_list_lock);
2736 /* aka (~was & now & flag), but this is more clear :) */
2737 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2739 static int osc_set_async_flags(struct obd_export *exp,
2740 struct lov_stripe_md *lsm,
2741 struct lov_oinfo *loi, void *cookie,
2742 obd_flag async_flags)
2744 struct client_obd *cli = &exp->exp_obd->u.cli;
2745 struct loi_oap_pages *lop;
2746 struct osc_async_page *oap;
2750 oap = oap_from_cookie(cookie);
2752 RETURN(PTR_ERR(oap));
2755 * bug 7311: OST-side locking is only supported for liblustre for now
2756 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2757 * implementation has to handle case where OST-locked page was picked
2758 * up by, e.g., ->writepage().
2760 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2761 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2764 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2768 loi = lsm->lsm_oinfo[0];
2770 if (oap->oap_cmd & OBD_BRW_WRITE) {
2771 lop = &loi->loi_write_lop;
2773 lop = &loi->loi_read_lop;
2776 client_obd_list_lock(&cli->cl_loi_list_lock);
2778 if (list_empty(&oap->oap_pending_item))
2779 GOTO(out, rc = -EINVAL);
2781 if ((oap->oap_async_flags & async_flags) == async_flags)
2784 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2785 oap->oap_async_flags |= ASYNC_READY;
2787 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2788 list_empty(&oap->oap_rpc_item)) {
2789 if (oap->oap_async_flags & ASYNC_HP)
2790 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2792 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2793 oap->oap_async_flags |= ASYNC_URGENT;
2794 loi_list_maint(cli, loi);
2797 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2798 oap->oap_async_flags);
2800 osc_check_rpcs(cli);
2801 client_obd_list_unlock(&cli->cl_loi_list_lock);
2805 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2806 struct lov_oinfo *loi,
2807 struct obd_io_group *oig, void *cookie,
2808 int cmd, obd_off off, int count,
2810 obd_flag async_flags)
2812 struct client_obd *cli = &exp->exp_obd->u.cli;
2813 struct osc_async_page *oap;
2814 struct loi_oap_pages *lop;
2818 oap = oap_from_cookie(cookie);
2820 RETURN(PTR_ERR(oap));
2822 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2825 if (!list_empty(&oap->oap_pending_item) ||
2826 !list_empty(&oap->oap_urgent_item) ||
2827 !list_empty(&oap->oap_rpc_item))
2831 loi = lsm->lsm_oinfo[0];
2833 client_obd_list_lock(&cli->cl_loi_list_lock);
2836 oap->oap_page_off = off;
2837 oap->oap_count = count;
2838 oap->oap_brw_flags = brw_flags;
2839 oap->oap_async_flags = async_flags;
2841 if (cmd & OBD_BRW_WRITE)
2842 lop = &loi->loi_write_lop;
2844 lop = &loi->loi_read_lop;
2846 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2847 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2849 rc = oig_add_one(oig, &oap->oap_occ);
2852 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2853 oap, oap->oap_page, rc);
2855 client_obd_list_unlock(&cli->cl_loi_list_lock);
2860 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2861 struct loi_oap_pages *lop, int cmd)
2863 struct list_head *pos, *tmp;
2864 struct osc_async_page *oap;
2866 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2867 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2868 list_del(&oap->oap_pending_item);
2869 osc_oap_to_pending(oap);
2871 loi_list_maint(cli, loi);
2874 static int osc_trigger_group_io(struct obd_export *exp,
2875 struct lov_stripe_md *lsm,
2876 struct lov_oinfo *loi,
2877 struct obd_io_group *oig)
2879 struct client_obd *cli = &exp->exp_obd->u.cli;
2883 loi = lsm->lsm_oinfo[0];
2885 client_obd_list_lock(&cli->cl_loi_list_lock);
2887 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2888 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2890 osc_check_rpcs(cli);
2891 client_obd_list_unlock(&cli->cl_loi_list_lock);
2896 static int osc_teardown_async_page(struct obd_export *exp,
2897 struct lov_stripe_md *lsm,
2898 struct lov_oinfo *loi, void *cookie)
2900 struct client_obd *cli = &exp->exp_obd->u.cli;
2901 struct loi_oap_pages *lop;
2902 struct osc_async_page *oap;
2906 oap = oap_from_cookie(cookie);
2908 RETURN(PTR_ERR(oap));
2911 loi = lsm->lsm_oinfo[0];
2913 if (oap->oap_cmd & OBD_BRW_WRITE) {
2914 lop = &loi->loi_write_lop;
2916 lop = &loi->loi_read_lop;
2919 client_obd_list_lock(&cli->cl_loi_list_lock);
2921 if (!list_empty(&oap->oap_rpc_item))
2922 GOTO(out, rc = -EBUSY);
2924 osc_exit_cache(cli, oap, 0);
2925 osc_wake_cache_waiters(cli);
2927 if (!list_empty(&oap->oap_urgent_item)) {
2928 list_del_init(&oap->oap_urgent_item);
2929 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2932 if (!list_empty(&oap->oap_pending_item)) {
2933 list_del_init(&oap->oap_pending_item);
2934 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2936 loi_list_maint(cli, loi);
2937 cache_remove_extent(cli->cl_cache, oap);
2939 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2941 client_obd_list_unlock(&cli->cl_loi_list_lock);
2945 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2946 struct ldlm_lock_desc *new, void *data,
2949 struct lustre_handle lockh = { 0 };
2953 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2954 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2959 case LDLM_CB_BLOCKING:
2960 ldlm_lock2handle(lock, &lockh);
2961 rc = ldlm_cli_cancel(&lockh);
2963 CERROR("ldlm_cli_cancel failed: %d\n", rc);
2965 case LDLM_CB_CANCELING: {
2967 ldlm_lock2handle(lock, &lockh);
2968 /* This lock wasn't granted, don't try to do anything */
2969 if (lock->l_req_mode != lock->l_granted_mode)
2972 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2975 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2976 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2977 lock, new, data,flag);
2986 EXPORT_SYMBOL(osc_extent_blocking_cb);
2988 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2991 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2994 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2997 lock_res_and_lock(lock);
2998 #if defined (__KERNEL__) && defined (__linux__)
2999 /* Liang XXX: Darwin and Winnt checking should be added */
3000 if (lock->l_ast_data && lock->l_ast_data != data) {
3001 struct inode *new_inode = data;
3002 struct inode *old_inode = lock->l_ast_data;
3003 if (!(old_inode->i_state & I_FREEING))
3004 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3005 LASSERTF(old_inode->i_state & I_FREEING,
3006 "Found existing inode %p/%lu/%u state %lu in lock: "
3007 "setting data to %p/%lu/%u\n", old_inode,
3008 old_inode->i_ino, old_inode->i_generation,
3010 new_inode, new_inode->i_ino, new_inode->i_generation);
3013 lock->l_ast_data = data;
3014 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3015 unlock_res_and_lock(lock);
3016 LDLM_LOCK_PUT(lock);
3019 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3020 ldlm_iterator_t replace, void *data)
3022 struct ldlm_res_id res_id;
3023 struct obd_device *obd = class_exp2obd(exp);
3025 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3026 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3030 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3031 struct obd_info *oinfo, int intent, int rc)
3036 /* The request was created before ldlm_cli_enqueue call. */
3037 if (rc == ELDLM_LOCK_ABORTED) {
3038 struct ldlm_reply *rep;
3040 /* swabbed by ldlm_cli_enqueue() */
3041 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3042 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3044 LASSERT(rep != NULL);
3045 if (rep->lock_policy_res1)
3046 rc = rep->lock_policy_res1;
3050 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3051 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3052 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3053 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3054 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3058 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3060 /* Call the update callback. */
3061 rc = oinfo->oi_cb_up(oinfo, rc);
3065 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3066 struct osc_enqueue_args *aa, int rc)
3068 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3069 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3070 struct ldlm_lock *lock;
3072 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3074 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3076 /* Complete obtaining the lock procedure. */
3077 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3079 &aa->oa_oi->oi_flags,
3080 &lsm->lsm_oinfo[0]->loi_lvb,
3081 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3082 lustre_swab_ost_lvb,
3083 aa->oa_oi->oi_lockh, rc);
3085 /* Complete osc stuff. */
3086 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3088 /* Release the lock for async request. */
3089 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3090 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3092 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3093 aa->oa_oi->oi_lockh, req, aa);
3094 LDLM_LOCK_PUT(lock);
3098 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3099 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3100 * other synchronous requests, however keeping some locks and trying to obtain
3101 * others may take a considerable amount of time in a case of ost failure; and
3102 * when other sync requests do not get released lock from a client, the client
3103 * is excluded from the cluster -- such scenarious make the life difficult, so
3104 * release locks just after they are obtained. */
3105 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3106 struct ldlm_enqueue_info *einfo,
3107 struct ptlrpc_request_set *rqset)
3109 struct ldlm_res_id res_id;
3110 struct obd_device *obd = exp->exp_obd;
3111 struct ldlm_reply *rep;
3112 struct ptlrpc_request *req = NULL;
3113 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3118 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3119 oinfo->oi_md->lsm_object_gr, &res_id);
3120 /* Filesystem lock extents are extended to page boundaries so that
3121 * dealing with the page cache is a little smoother. */
3122 oinfo->oi_policy.l_extent.start -=
3123 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3124 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3126 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3129 /* Next, search for already existing extent locks that will cover us */
3130 /* If we're trying to read, we also search for an existing PW lock. The
3131 * VFS and page cache already protect us locally, so lots of readers/
3132 * writers can share a single PW lock.
3134 * There are problems with conversion deadlocks, so instead of
3135 * converting a read lock to a write lock, we'll just enqueue a new
3138 * At some point we should cancel the read lock instead of making them
3139 * send us a blocking callback, but there are problems with canceling
3140 * locks out from other users right now, too. */
3141 mode = einfo->ei_mode;
3142 if (einfo->ei_mode == LCK_PR)
3144 mode = ldlm_lock_match(obd->obd_namespace,
3145 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3146 einfo->ei_type, &oinfo->oi_policy, mode,
3149 /* addref the lock only if not async requests and PW lock is
3150 * matched whereas we asked for PR. */
3151 if (!rqset && einfo->ei_mode != mode)
3152 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3153 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3156 /* I would like to be able to ASSERT here that rss <=
3157 * kms, but I can't, for reasons which are explained in
3161 /* We already have a lock, and it's referenced */
3162 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3164 /* For async requests, decref the lock. */
3165 if (einfo->ei_mode != mode)
3166 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3168 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3176 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3177 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3178 [DLM_LOCKREQ_OFF + 1] = 0 };
3180 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3184 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3185 size[DLM_REPLY_REC_OFF] =
3186 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3187 ptlrpc_req_set_repsize(req, 3, size);
3190 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3191 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3193 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3194 &oinfo->oi_policy, &oinfo->oi_flags,
3195 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3196 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3197 lustre_swab_ost_lvb, oinfo->oi_lockh,
3201 struct osc_enqueue_args *aa;
3202 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3203 aa = ptlrpc_req_async_args(req);
3208 req->rq_interpret_reply = osc_enqueue_interpret;
3209 ptlrpc_set_add_req(rqset, req);
3210 } else if (intent) {
3211 ptlrpc_req_finished(req);
3216 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3218 ptlrpc_req_finished(req);
3223 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3224 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3225 int *flags, void *data, struct lustre_handle *lockh)
3227 struct ldlm_res_id res_id;
3228 struct obd_device *obd = exp->exp_obd;
3229 int lflags = *flags;
3233 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3235 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3237 /* Filesystem lock extents are extended to page boundaries so that
3238 * dealing with the page cache is a little smoother */
3239 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3240 policy->l_extent.end |= ~CFS_PAGE_MASK;
3242 /* Next, search for already existing extent locks that will cover us */
3243 /* If we're trying to read, we also search for an existing PW lock. The
3244 * VFS and page cache already protect us locally, so lots of readers/
3245 * writers can share a single PW lock. */
3249 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3250 &res_id, type, policy, rc, lockh);
3252 osc_set_data_with_check(lockh, data, lflags);
3253 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3254 ldlm_lock_addref(lockh, LCK_PR);
3255 ldlm_lock_decref(lockh, LCK_PW);
3263 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3264 __u32 mode, struct lustre_handle *lockh)
3268 if (unlikely(mode == LCK_GROUP))
3269 ldlm_lock_decref_and_cancel(lockh, mode);
3271 ldlm_lock_decref(lockh, mode);
3276 static int osc_cancel_unused(struct obd_export *exp,
3277 struct lov_stripe_md *lsm, int flags, void *opaque)
3279 struct obd_device *obd = class_exp2obd(exp);
3280 struct ldlm_res_id res_id, *resp = NULL;
3283 resp = osc_build_res_name(lsm->lsm_object_id,
3284 lsm->lsm_object_gr, &res_id);
3287 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3291 static int osc_join_lru(struct obd_export *exp,
3292 struct lov_stripe_md *lsm, int join)
3294 struct obd_device *obd = class_exp2obd(exp);
3295 struct ldlm_res_id res_id, *resp = NULL;
3298 resp = osc_build_res_name(lsm->lsm_object_id,
3299 lsm->lsm_object_gr, &res_id);
3302 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3306 static int osc_statfs_interpret(struct ptlrpc_request *req,
3307 struct osc_async_args *aa, int rc)
3309 struct obd_statfs *msfs;
3315 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3316 lustre_swab_obd_statfs);
3318 CERROR("Can't unpack obd_statfs\n");
3319 GOTO(out, rc = -EPROTO);
3322 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3324 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3328 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3329 __u64 max_age, struct ptlrpc_request_set *rqset)
3331 struct ptlrpc_request *req;
3332 struct osc_async_args *aa;
3333 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3336 /* We could possibly pass max_age in the request (as an absolute
3337 * timestamp or a "seconds.usec ago") so the target can avoid doing
3338 * extra calls into the filesystem if that isn't necessary (e.g.
3339 * during mount that would help a bit). Having relative timestamps
3340 * is not so great if request processing is slow, while absolute
3341 * timestamps are not ideal because they need time synchronization. */
3342 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3343 OST_STATFS, 1, NULL, NULL);
3347 ptlrpc_req_set_repsize(req, 2, size);
3348 req->rq_request_portal = OST_CREATE_PORTAL;
3349 ptlrpc_at_set_req_timeout(req);
3350 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3351 /* procfs requests not want stat in wait for avoid deadlock */
3352 req->rq_no_resend = 1;
3353 req->rq_no_delay = 1;
3356 req->rq_interpret_reply = osc_statfs_interpret;
3357 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3358 aa = ptlrpc_req_async_args(req);
3361 ptlrpc_set_add_req(rqset, req);
3365 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3366 __u64 max_age, __u32 flags)
3368 struct obd_statfs *msfs;
3369 struct ptlrpc_request *req;
3370 struct obd_import *imp = NULL;
3371 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3375 /*Since the request might also come from lprocfs, so we need
3376 *sync this with client_disconnect_export Bug15684*/
3377 down_read(&obd->u.cli.cl_sem);
3378 if (obd->u.cli.cl_import)
3379 imp = class_import_get(obd->u.cli.cl_import);
3380 up_read(&obd->u.cli.cl_sem);
3384 /* We could possibly pass max_age in the request (as an absolute
3385 * timestamp or a "seconds.usec ago") so the target can avoid doing
3386 * extra calls into the filesystem if that isn't necessary (e.g.
3387 * during mount that would help a bit). Having relative timestamps
3388 * is not so great if request processing is slow, while absolute
3389 * timestamps are not ideal because they need time synchronization. */
3390 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3391 OST_STATFS, 1, NULL, NULL);
3393 class_import_put(imp);
3397 ptlrpc_req_set_repsize(req, 2, size);
3398 req->rq_request_portal = OST_CREATE_PORTAL;
3399 ptlrpc_at_set_req_timeout(req);
3401 if (flags & OBD_STATFS_NODELAY) {
3402 /* procfs requests not want stat in wait for avoid deadlock */
3403 req->rq_no_resend = 1;
3404 req->rq_no_delay = 1;
3407 rc = ptlrpc_queue_wait(req);
3411 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3412 lustre_swab_obd_statfs);
3414 CERROR("Can't unpack obd_statfs\n");
3415 GOTO(out, rc = -EPROTO);
3418 memcpy(osfs, msfs, sizeof(*osfs));
3422 ptlrpc_req_finished(req);
3426 /* Retrieve object striping information.
3428 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3429 * the maximum number of OST indices which will fit in the user buffer.
3430 * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3432 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3434 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3435 struct lov_user_md_v3 lum, *lumk;
3436 int rc = 0, lum_size;
3437 struct lov_user_ost_data_v1 *lmm_objects;
3443 /* we only need the header part from user space to get lmm_magic and
3444 * lmm_stripe_count, (the header part is common to v1 and v3) */
3445 lum_size = sizeof(struct lov_user_md_v1);
3446 if (copy_from_user(&lum, lump, lum_size))
3449 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3450 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3453 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3454 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3455 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3456 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3458 /* we can use lov_mds_md_size() to compute lum_size
3459 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3460 if (lum.lmm_stripe_count > 0) {
3461 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3462 OBD_ALLOC(lumk, lum_size);
3465 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3466 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3468 lmm_objects = &(lumk->lmm_objects[0]);
3469 lmm_objects->l_object_id = lsm->lsm_object_id;
3471 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3475 lumk->lmm_object_id = lsm->lsm_object_id;
3476 lumk->lmm_stripe_count = 1;
3478 if (copy_to_user(lump, lumk, lum_size))
3482 OBD_FREE(lumk, lum_size);
3488 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3489 void *karg, void *uarg)
3491 struct obd_device *obd = exp->exp_obd;
3492 struct obd_ioctl_data *data = karg;
3496 if (!try_module_get(THIS_MODULE)) {
3497 CERROR("Can't get module. Is it alive?");
3501 case OBD_IOC_LOV_GET_CONFIG: {
3503 struct lov_desc *desc;
3504 struct obd_uuid uuid;
3508 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3509 GOTO(out, err = -EINVAL);
3511 data = (struct obd_ioctl_data *)buf;
3513 if (sizeof(*desc) > data->ioc_inllen1) {
3514 obd_ioctl_freedata(buf, len);
3515 GOTO(out, err = -EINVAL);
3518 if (data->ioc_inllen2 < sizeof(uuid)) {
3519 obd_ioctl_freedata(buf, len);
3520 GOTO(out, err = -EINVAL);
3523 desc = (struct lov_desc *)data->ioc_inlbuf1;
3524 desc->ld_tgt_count = 1;
3525 desc->ld_active_tgt_count = 1;
3526 desc->ld_default_stripe_count = 1;
3527 desc->ld_default_stripe_size = 0;
3528 desc->ld_default_stripe_offset = 0;
3529 desc->ld_pattern = 0;
3530 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3532 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3534 err = copy_to_user((void *)uarg, buf, len);
3537 obd_ioctl_freedata(buf, len);
3540 case LL_IOC_LOV_SETSTRIPE:
3541 err = obd_alloc_memmd(exp, karg);
3545 case LL_IOC_LOV_GETSTRIPE:
3546 err = osc_getstripe(karg, uarg);
3548 case OBD_IOC_CLIENT_RECOVER:
3549 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3554 case IOC_OSC_SET_ACTIVE:
3555 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3558 case OBD_IOC_POLL_QUOTACHECK:
3559 err = lquota_poll_check(quota_interface, exp,
3560 (struct if_quotacheck *)karg);
3562 case OBD_IOC_DESTROY: {
3565 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3566 GOTO (out, err = -EPERM);
3567 oa = &data->ioc_obdo1;
3570 GOTO(out, err = -EINVAL);
3572 oa->o_valid |= OBD_MD_FLGROUP;
3574 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3577 case OBD_IOC_PING_TARGET:
3578 err = ptlrpc_obd_ping(obd);
3581 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3582 cmd, cfs_curproc_comm());
3583 GOTO(out, err = -ENOTTY);
3586 module_put(THIS_MODULE);
3590 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3591 void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3594 if (!vallen || !val)
3597 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3598 __u32 *stripe = val;
3599 *vallen = sizeof(*stripe);
3602 } else if (KEY_IS(KEY_LAST_ID)) {
3603 struct ptlrpc_request *req;
3605 char *bufs[2] = { NULL, key };
3606 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3609 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3610 OST_GET_INFO, 2, size, bufs);
3614 size[REPLY_REC_OFF] = *vallen;
3615 ptlrpc_req_set_repsize(req, 2, size);
3616 rc = ptlrpc_queue_wait(req);
3620 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3621 lustre_swab_ost_last_id);
3622 if (reply == NULL) {
3623 CERROR("Can't unpack OST last ID\n");
3624 GOTO(out, rc = -EPROTO);
3626 *((obd_id *)val) = *reply;
3628 ptlrpc_req_finished(req);
3630 } else if (KEY_IS(KEY_FIEMAP)) {
3631 struct ptlrpc_request *req;
3632 struct ll_user_fiemap *reply;
3633 char *bufs[2] = { NULL, key };
3634 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3637 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3638 OST_GET_INFO, 2, size, bufs);
3642 size[REPLY_REC_OFF] = *vallen;
3643 ptlrpc_req_set_repsize(req, 2, size);
3645 rc = ptlrpc_queue_wait(req);
3648 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3649 lustre_swab_fiemap);
3650 if (reply == NULL) {
3651 CERROR("Can't unpack FIEMAP reply.\n");
3652 GOTO(out1, rc = -EPROTO);
3655 memcpy(val, reply, *vallen);
3658 ptlrpc_req_finished(req);
3666 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3669 struct llog_ctxt *ctxt;
3670 struct obd_import *imp = req->rq_import;
3676 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3679 rc = llog_initiator_connect(ctxt);
3681 CERROR("cannot establish connection for "
3682 "ctxt %p: %d\n", ctxt, rc);
3685 llog_ctxt_put(ctxt);
3686 spin_lock(&imp->imp_lock);
3687 imp->imp_server_timeout = 1;
3688 imp->imp_pingable = 1;
3689 spin_unlock(&imp->imp_lock);
3690 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3695 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3696 void *key, obd_count vallen, void *val,
3697 struct ptlrpc_request_set *set)
3699 struct ptlrpc_request *req;
3700 struct obd_device *obd = exp->exp_obd;
3701 struct obd_import *imp = class_exp2cliimp(exp);
3702 __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3703 char *bufs[3] = { NULL, key, val };
3706 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3708 if (KEY_IS(KEY_NEXT_ID)) {
3709 if (vallen != sizeof(obd_id))
3711 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3712 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3713 exp->exp_obd->obd_name,
3714 obd->u.cli.cl_oscc.oscc_next_id);
3719 if (KEY_IS(KEY_UNLINKED)) {
3720 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3721 spin_lock(&oscc->oscc_lock);
3722 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3723 spin_unlock(&oscc->oscc_lock);
3727 if (KEY_IS(KEY_INIT_RECOV)) {
3728 if (vallen != sizeof(int))
3730 spin_lock(&imp->imp_lock);
3731 imp->imp_initial_recov = *(int *)val;
3732 spin_unlock(&imp->imp_lock);
3733 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3734 exp->exp_obd->obd_name,
3735 imp->imp_initial_recov);
3739 if (KEY_IS(KEY_CHECKSUM)) {
3740 if (vallen != sizeof(int))
3742 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3749 /* We pass all other commands directly to OST. Since nobody calls osc
3750 methods directly and everybody is supposed to go through LOV, we
3751 assume lov checked invalid values for us.
3752 The only recognised values so far are evict_by_nid and mds_conn.
3753 Even if something bad goes through, we'd get a -EINVAL from OST
3756 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3761 if (KEY_IS(KEY_MDS_CONN))
3762 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3764 ptlrpc_req_set_repsize(req, 1, NULL);
3765 ptlrpc_set_add_req(set, req);
3766 ptlrpc_check_set(set);
3772 static struct llog_operations osc_size_repl_logops = {
3773 lop_cancel: llog_obd_repl_cancel
3776 static struct llog_operations osc_mds_ost_orig_logops;
3777 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3778 int count, struct llog_catid *catid,
3779 struct obd_uuid *uuid)
3784 spin_lock(&obd->obd_dev_lock);
3785 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3786 osc_mds_ost_orig_logops = llog_lvfs_ops;
3787 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3788 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3789 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3790 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3792 spin_unlock(&obd->obd_dev_lock);
3794 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3795 &catid->lci_logid, &osc_mds_ost_orig_logops);
3797 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3801 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3802 &osc_size_repl_logops);
3804 struct llog_ctxt *ctxt =
3805 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3808 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3812 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3813 obd->obd_name, tgt->obd_name, count, catid, rc);
3814 CERROR("logid "LPX64":0x%x\n",
3815 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3820 static int osc_llog_finish(struct obd_device *obd, int count)
3822 struct llog_ctxt *ctxt;
3823 int rc = 0, rc2 = 0;
3826 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3828 rc = llog_cleanup(ctxt);
3830 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3832 rc2 = llog_cleanup(ctxt);
3839 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3840 struct obd_uuid *cluuid,
3841 struct obd_connect_data *data,
3844 struct client_obd *cli = &obd->u.cli;
3846 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3849 client_obd_list_lock(&cli->cl_loi_list_lock);
3850 data->ocd_grant = cli->cl_avail_grant ?:
3851 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3852 lost_grant = cli->cl_lost_grant;
3853 cli->cl_lost_grant = 0;
3854 client_obd_list_unlock(&cli->cl_loi_list_lock);
3856 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3857 "cl_lost_grant: %ld\n", data->ocd_grant,
3858 cli->cl_avail_grant, lost_grant);
3859 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3860 " ocd_grant: %d\n", data->ocd_connect_flags,
3861 data->ocd_version, data->ocd_grant);
3867 static int osc_disconnect(struct obd_export *exp)
3869 struct obd_device *obd = class_exp2obd(exp);
3870 struct llog_ctxt *ctxt;
3873 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3875 if (obd->u.cli.cl_conn_count == 1) {
3876 /* Flush any remaining cancel messages out to the
3878 llog_sync(ctxt, exp);
3880 llog_ctxt_put(ctxt);
3882 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3886 rc = client_disconnect_export(exp);
3890 static int osc_import_event(struct obd_device *obd,
3891 struct obd_import *imp,
3892 enum obd_import_event event)
3894 struct client_obd *cli;
3898 LASSERT(imp->imp_obd == obd);
3901 case IMP_EVENT_DISCON: {
3902 /* Only do this on the MDS OSC's */
3903 if (imp->imp_server_timeout) {
3904 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3906 spin_lock(&oscc->oscc_lock);
3907 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3908 spin_unlock(&oscc->oscc_lock);
3911 client_obd_list_lock(&cli->cl_loi_list_lock);
3912 cli->cl_avail_grant = 0;
3913 cli->cl_lost_grant = 0;
3914 client_obd_list_unlock(&cli->cl_loi_list_lock);
3915 ptlrpc_import_setasync(imp, -1);
3919 case IMP_EVENT_INACTIVE: {
3920 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3923 case IMP_EVENT_INVALIDATE: {
3924 struct ldlm_namespace *ns = obd->obd_namespace;
3928 client_obd_list_lock(&cli->cl_loi_list_lock);
3929 /* all pages go to failing rpcs due to the invalid import */
3930 osc_check_rpcs(cli);
3931 client_obd_list_unlock(&cli->cl_loi_list_lock);
3933 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3937 case IMP_EVENT_ACTIVE: {
3938 /* Only do this on the MDS OSC's */
3939 if (imp->imp_server_timeout) {
3940 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3942 spin_lock(&oscc->oscc_lock);
3943 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3944 spin_unlock(&oscc->oscc_lock);
3946 CDEBUG(D_INFO, "notify server \n");
3947 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3950 case IMP_EVENT_OCD: {
3951 struct obd_connect_data *ocd = &imp->imp_connect_data;
3953 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3954 osc_init_grant(&obd->u.cli, ocd);
3957 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3958 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3960 ptlrpc_import_setasync(imp, 1);
3961 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3965 CERROR("Unknown import event %d\n", event);
3971 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3977 rc = ptlrpcd_addref();
3981 rc = client_obd_setup(obd, len, buf);
3985 struct lprocfs_static_vars lvars = { 0 };
3986 struct client_obd *cli = &obd->u.cli;
3988 lprocfs_osc_init_vars(&lvars);
3989 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3990 lproc_osc_attach_seqstat(obd);
3991 ptlrpc_lprocfs_register_obd(obd);
3995 /* We need to allocate a few requests more, because
3996 brw_interpret tries to create new requests before freeing
3997 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3998 reserved, but I afraid that might be too much wasted RAM
3999 in fact, so 2 is just my guess and still should work. */
4000 cli->cl_import->imp_rq_pool =
4001 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4003 ptlrpc_add_rqs_to_pool);
4004 cli->cl_cache = cache_create(obd);
4005 if (!cli->cl_cache) {
4014 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4020 case OBD_CLEANUP_EARLY: {
4021 struct obd_import *imp;
4022 imp = obd->u.cli.cl_import;
4023 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4024 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4025 ptlrpc_deactivate_import(imp);
4028 case OBD_CLEANUP_EXPORTS: {
4029 /* If we set up but never connected, the
4030 client import will not have been cleaned. */
4031 if (obd->u.cli.cl_import) {
4032 struct obd_import *imp;
4033 down_write(&obd->u.cli.cl_sem);
4034 imp = obd->u.cli.cl_import;
4035 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4037 ptlrpc_invalidate_import(imp);
4038 if (imp->imp_rq_pool) {
4039 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4040 imp->imp_rq_pool = NULL;
4042 class_destroy_import(imp);
4043 up_write(&obd->u.cli.cl_sem);
4044 obd->u.cli.cl_import = NULL;
4046 rc = obd_llog_finish(obd, 0);
4048 CERROR("failed to cleanup llogging subsystems\n");
4051 case OBD_CLEANUP_SELF_EXP:
4053 case OBD_CLEANUP_OBD:
4059 int osc_cleanup(struct obd_device *obd)
4061 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4065 ptlrpc_lprocfs_unregister_obd(obd);
4066 lprocfs_obd_cleanup(obd);
4068 spin_lock(&oscc->oscc_lock);
4069 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4070 oscc->oscc_flags |= OSCC_FLAG_EXITING;
4071 spin_unlock(&oscc->oscc_lock);
4073 /* free memory of osc quota cache */
4074 lquota_cleanup(quota_interface, obd);
4076 cache_destroy(obd->u.cli.cl_cache);
4077 rc = client_obd_cleanup(obd);
4083 static int osc_register_page_removal_cb(struct obd_export *exp,
4084 obd_page_removal_cb_t func,
4085 obd_pin_extent_cb pin_cb)
4087 return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4091 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4092 obd_page_removal_cb_t func)
4094 return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4097 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4098 obd_lock_cancel_cb cb)
4100 LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4102 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4106 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4107 obd_lock_cancel_cb cb)
4109 if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4110 CERROR("Unregistering cancel cb %p, while only %p was "
4112 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4116 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4120 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4122 struct lustre_cfg *lcfg = buf;
4123 struct lprocfs_static_vars lvars = { 0 };
4126 lprocfs_osc_init_vars(&lvars);
4128 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4132 struct obd_ops osc_obd_ops = {
4133 .o_owner = THIS_MODULE,
4134 .o_setup = osc_setup,
4135 .o_precleanup = osc_precleanup,
4136 .o_cleanup = osc_cleanup,
4137 .o_add_conn = client_import_add_conn,
4138 .o_del_conn = client_import_del_conn,
4139 .o_connect = client_connect_import,
4140 .o_reconnect = osc_reconnect,
4141 .o_disconnect = osc_disconnect,
4142 .o_statfs = osc_statfs,
4143 .o_statfs_async = osc_statfs_async,
4144 .o_packmd = osc_packmd,
4145 .o_unpackmd = osc_unpackmd,
4146 .o_precreate = osc_precreate,
4147 .o_create = osc_create,
4148 .o_destroy = osc_destroy,
4149 .o_getattr = osc_getattr,
4150 .o_getattr_async = osc_getattr_async,
4151 .o_setattr = osc_setattr,
4152 .o_setattr_async = osc_setattr_async,
4154 .o_brw_async = osc_brw_async,
4155 .o_prep_async_page = osc_prep_async_page,
4156 .o_reget_short_lock = osc_reget_short_lock,
4157 .o_release_short_lock = osc_release_short_lock,
4158 .o_queue_async_io = osc_queue_async_io,
4159 .o_set_async_flags = osc_set_async_flags,
4160 .o_queue_group_io = osc_queue_group_io,
4161 .o_trigger_group_io = osc_trigger_group_io,
4162 .o_teardown_async_page = osc_teardown_async_page,
4163 .o_punch = osc_punch,
4165 .o_enqueue = osc_enqueue,
4166 .o_match = osc_match,
4167 .o_change_cbdata = osc_change_cbdata,
4168 .o_cancel = osc_cancel,
4169 .o_cancel_unused = osc_cancel_unused,
4170 .o_join_lru = osc_join_lru,
4171 .o_iocontrol = osc_iocontrol,
4172 .o_get_info = osc_get_info,
4173 .o_set_info_async = osc_set_info_async,
4174 .o_import_event = osc_import_event,
4175 .o_llog_init = osc_llog_init,
4176 .o_llog_finish = osc_llog_finish,
4177 .o_process_config = osc_process_config,
4178 .o_register_page_removal_cb = osc_register_page_removal_cb,
4179 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4180 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4181 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4183 int __init osc_init(void)
4185 struct lprocfs_static_vars lvars = { 0 };
4189 lprocfs_osc_init_vars(&lvars);
4191 request_module("lquota");
4192 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4193 lquota_init(quota_interface);
4194 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4196 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4199 if (quota_interface)
4200 PORTAL_SYMBOL_PUT(osc_quota_interface);
4208 static void /*__exit*/ osc_exit(void)
4210 lquota_exit(quota_interface);
4211 if (quota_interface)
4212 PORTAL_SYMBOL_PUT(osc_quota_interface);
4214 class_unregister_type(LUSTRE_OSC_NAME);
4217 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4218 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4219 MODULE_LICENSE("GPL");
4221 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);