1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
144 /* XXX zero *lsmp? */
145 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147 LASSERT((*lsmp)->lsm_object_id);
148 LASSERT((*lsmp)->lsm_object_gr);
151 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156 static inline void osc_pack_capa(struct ptlrpc_request *req,
157 struct ost_body *body, void *capa)
159 struct obd_capa *oc = (struct obd_capa *)capa;
160 struct lustre_capa *c;
165 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
168 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169 DEBUG_CAPA(D_SEC, c, "pack");
172 static inline void osc_pack_req_body(struct ptlrpc_request *req,
173 struct obd_info *oinfo)
175 struct ost_body *body;
177 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
180 body->oa = *oinfo->oi_oa;
181 osc_pack_capa(req, body, oinfo->oi_capa);
184 static inline void osc_set_capa_size(struct ptlrpc_request *req,
185 const struct req_msg_field *field,
189 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
191 /* it is already calculated as sizeof struct obd_capa */
195 static int osc_getattr_interpret(struct ptlrpc_request *req,
196 struct osc_async_args *aa, int rc)
198 struct ost_body *body;
204 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
205 lustre_swab_ost_body);
207 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
208 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
210 /* This should really be sent by the OST */
211 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
212 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
214 CDEBUG(D_INFO, "can't unpack ost_body\n");
216 aa->aa_oi->oi_oa->o_valid = 0;
219 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
224 struct ptlrpc_request_set *set)
226 struct ptlrpc_request *req;
227 struct osc_async_args *aa;
231 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
235 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
236 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
238 ptlrpc_request_free(req);
242 osc_pack_req_body(req, oinfo);
244 ptlrpc_request_set_replen(req);
245 req->rq_interpret_reply = osc_getattr_interpret;
247 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
248 aa = (struct osc_async_args *)&req->rq_async_args;
251 ptlrpc_set_add_req(set, req);
255 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
257 struct ptlrpc_request *req;
258 struct ost_body *body;
262 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
266 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
267 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
269 ptlrpc_request_free(req);
273 osc_pack_req_body(req, oinfo);
275 ptlrpc_request_set_replen(req);
277 rc = ptlrpc_queue_wait(req);
281 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
283 GOTO(out, rc = -EPROTO);
285 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
286 *oinfo->oi_oa = body->oa;
288 /* This should really be sent by the OST */
289 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
290 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
294 ptlrpc_req_finished(req);
298 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
299 struct obd_trans_info *oti)
301 struct ptlrpc_request *req;
302 struct ost_body *body;
306 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
307 oinfo->oi_oa->o_gr > 0);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
316 ptlrpc_request_free(req);
320 osc_pack_req_body(req, oinfo);
322 ptlrpc_request_set_replen(req);
325 rc = ptlrpc_queue_wait(req);
329 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
331 GOTO(out, rc = -EPROTO);
333 *oinfo->oi_oa = body->oa;
337 ptlrpc_req_finished(req);
341 static int osc_setattr_interpret(struct ptlrpc_request *req,
342 struct osc_async_args *aa, int rc)
344 struct ost_body *body;
350 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
352 GOTO(out, rc = -EPROTO);
354 *aa->aa_oi->oi_oa = body->oa;
356 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
360 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
361 struct obd_trans_info *oti,
362 struct ptlrpc_request_set *rqset)
364 struct ptlrpc_request *req;
365 struct osc_async_args *aa;
369 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
373 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
374 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
376 ptlrpc_request_free(req);
380 osc_pack_req_body(req, oinfo);
382 ptlrpc_request_set_replen(req);
384 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
386 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
389 /* do mds to ost setattr asynchronouly */
391 /* Do not wait for response. */
392 ptlrpcd_add_req(req);
394 req->rq_interpret_reply = osc_setattr_interpret;
396 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
397 aa = (struct osc_async_args *)&req->rq_async_args;
400 ptlrpc_set_add_req(rqset, req);
406 int osc_real_create(struct obd_export *exp, struct obdo *oa,
407 struct lov_stripe_md **ea, struct obd_trans_info *oti)
409 struct ptlrpc_request *req;
410 struct ost_body *body;
411 struct lov_stripe_md *lsm;
420 rc = obd_alloc_memmd(exp, &lsm);
425 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
427 GOTO(out, rc = -ENOMEM);
429 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
431 ptlrpc_request_free(req);
435 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
439 ptlrpc_request_set_replen(req);
441 if (oa->o_valid & OBD_MD_FLINLINE) {
442 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
443 oa->o_flags == OBD_FL_DELORPHAN);
445 "delorphan from OST integration");
446 /* Don't resend the delorphan req */
447 req->rq_no_resend = req->rq_no_delay = 1;
450 rc = ptlrpc_queue_wait(req);
454 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
456 GOTO(out_req, rc = -EPROTO);
460 /* This should really be sent by the OST */
461 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
462 oa->o_valid |= OBD_MD_FLBLKSZ;
464 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
465 * have valid lsm_oinfo data structs, so don't go touching that.
466 * This needs to be fixed in a big way.
468 lsm->lsm_object_id = oa->o_id;
469 lsm->lsm_object_gr = oa->o_gr;
473 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
475 if (oa->o_valid & OBD_MD_FLCOOKIE) {
476 if (!oti->oti_logcookies)
477 oti_alloc_cookies(oti, 1);
478 *oti->oti_logcookies = *obdo_logcookie(oa);
482 CDEBUG(D_HA, "transno: "LPD64"\n",
483 lustre_msg_get_transno(req->rq_repmsg));
485 ptlrpc_req_finished(req);
488 obd_free_memmd(exp, &lsm);
492 static int osc_punch_interpret(struct ptlrpc_request *req,
493 struct osc_async_args *aa, int rc)
495 struct ost_body *body;
501 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
503 GOTO(out, rc = -EPROTO);
505 *aa->aa_oi->oi_oa = body->oa;
507 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
511 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
512 struct obd_trans_info *oti,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_async_args *aa;
517 struct ost_body *body;
522 CDEBUG(D_INFO, "oa NULL\n");
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
533 ptlrpc_request_free(req);
536 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537 osc_pack_req_body(req, oinfo);
539 /* overload the size and blocks fields in the oa with start/end */
540 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542 body->oa.o_size = oinfo->oi_policy.l_extent.start;
543 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
544 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
545 ptlrpc_request_set_replen(req);
548 req->rq_interpret_reply = osc_punch_interpret;
549 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
550 aa = (struct osc_async_args *)&req->rq_async_args;
552 ptlrpc_set_add_req(rqset, req);
557 static int osc_sync(struct obd_export *exp, struct obdo *oa,
558 struct lov_stripe_md *md, obd_size start, obd_size end,
561 struct ptlrpc_request *req;
562 struct ost_body *body;
567 CDEBUG(D_INFO, "oa NULL\n");
571 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
575 osc_set_capa_size(req, &RMF_CAPA1, capa);
576 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
578 ptlrpc_request_free(req);
582 /* overload the size and blocks fields in the oa with start/end */
583 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
586 body->oa.o_size = start;
587 body->oa.o_blocks = end;
588 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
589 osc_pack_capa(req, body, capa);
591 ptlrpc_request_set_replen(req);
593 rc = ptlrpc_queue_wait(req);
597 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
599 GOTO(out, rc = -EPROTO);
605 ptlrpc_req_finished(req);
609 /* Find and cancel locally locks matched by @mode in the resource found by
610 * @objid. Found locks are added into @cancel list. Returns the amount of
611 * locks added to @cancels list. */
612 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
613 struct list_head *cancels, ldlm_mode_t mode,
616 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
617 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
618 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
625 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
626 lock_flags, 0, NULL);
627 ldlm_resource_putref(res);
631 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
634 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
636 atomic_dec(&cli->cl_destroy_in_flight);
637 cfs_waitq_signal(&cli->cl_destroy_waitq);
641 static int osc_can_send_destroy(struct client_obd *cli)
643 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
644 cli->cl_max_rpcs_in_flight) {
645 /* The destroy request can be sent */
648 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
649 cli->cl_max_rpcs_in_flight) {
651 * The counter has been modified between the two atomic
654 cfs_waitq_signal(&cli->cl_destroy_waitq);
659 /* Destroy requests can be async always on the client, and we don't even really
660 * care about the return code since the client cannot do anything at all about
662 * When the MDS is unlinking a filename, it saves the file objects into a
663 * recovery llog, and these object records are cancelled when the OST reports
664 * they were destroyed and sync'd to disk (i.e. transaction committed).
665 * If the client dies, or the OST is down when the object should be destroyed,
666 * the records are not cancelled, and when the OST reconnects to the MDS next,
667 * it will retrieve the llog unlink logs and then sends the log cancellation
668 * cookies to the MDS after committing destroy transactions. */
669 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
670 struct lov_stripe_md *ea, struct obd_trans_info *oti,
671 struct obd_export *md_export)
673 struct client_obd *cli = &exp->exp_obd->u.cli;
674 struct ptlrpc_request *req;
675 struct ost_body *body;
676 CFS_LIST_HEAD(cancels);
681 CDEBUG(D_INFO, "oa NULL\n");
685 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
686 LDLM_FL_DISCARD_DATA);
688 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
690 ldlm_lock_list_put(&cancels, l_bl_ast, count);
694 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
697 ldlm_lock_list_put(&cancels, l_bl_ast, count);
698 ptlrpc_request_free(req);
702 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
703 req->rq_interpret_reply = osc_destroy_interpret;
705 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
706 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
707 sizeof(*oti->oti_logcookies));
708 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
712 ptlrpc_request_set_replen(req);
714 if (!osc_can_send_destroy(cli)) {
715 struct l_wait_info lwi = { 0 };
718 * Wait until the number of on-going destroy RPCs drops
719 * under max_rpc_in_flight
721 l_wait_event_exclusive(cli->cl_destroy_waitq,
722 osc_can_send_destroy(cli), &lwi);
725 /* Do not wait for response */
726 ptlrpcd_add_req(req);
730 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
733 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
735 LASSERT(!(oa->o_valid & bits));
738 client_obd_list_lock(&cli->cl_loi_list_lock);
739 oa->o_dirty = cli->cl_dirty;
740 if (cli->cl_dirty > cli->cl_dirty_max) {
741 CERROR("dirty %lu > dirty_max %lu\n",
742 cli->cl_dirty, cli->cl_dirty_max);
744 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
745 CERROR("dirty %d > system dirty_max %d\n",
746 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
748 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
749 CERROR("dirty %lu - dirty_max %lu too big???\n",
750 cli->cl_dirty, cli->cl_dirty_max);
753 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
754 (cli->cl_max_rpcs_in_flight + 1);
755 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
757 oa->o_grant = cli->cl_avail_grant;
758 oa->o_dropped = cli->cl_lost_grant;
759 cli->cl_lost_grant = 0;
760 client_obd_list_unlock(&cli->cl_loi_list_lock);
761 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
762 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
765 /* caller must hold loi_list_lock */
766 static void osc_consume_write_grant(struct client_obd *cli,
767 struct brw_page *pga)
769 atomic_inc(&obd_dirty_pages);
770 cli->cl_dirty += CFS_PAGE_SIZE;
771 cli->cl_avail_grant -= CFS_PAGE_SIZE;
772 pga->flag |= OBD_BRW_FROM_GRANT;
773 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
774 CFS_PAGE_SIZE, pga, pga->pg);
775 LASSERT(cli->cl_avail_grant >= 0);
778 /* the companion to osc_consume_write_grant, called when a brw has completed.
779 * must be called with the loi lock held. */
780 static void osc_release_write_grant(struct client_obd *cli,
781 struct brw_page *pga, int sent)
783 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
786 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
791 pga->flag &= ~OBD_BRW_FROM_GRANT;
792 atomic_dec(&obd_dirty_pages);
793 cli->cl_dirty -= CFS_PAGE_SIZE;
795 cli->cl_lost_grant += CFS_PAGE_SIZE;
796 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
797 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
798 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
799 /* For short writes we shouldn't count parts of pages that
800 * span a whole block on the OST side, or our accounting goes
801 * wrong. Should match the code in filter_grant_check. */
802 int offset = pga->off & ~CFS_PAGE_MASK;
803 int count = pga->count + (offset & (blocksize - 1));
804 int end = (offset + pga->count) & (blocksize - 1);
806 count += blocksize - end;
808 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
809 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
810 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
811 cli->cl_avail_grant, cli->cl_dirty);
817 static unsigned long rpcs_in_flight(struct client_obd *cli)
819 return cli->cl_r_in_flight + cli->cl_w_in_flight;
822 /* caller must hold loi_list_lock */
823 void osc_wake_cache_waiters(struct client_obd *cli)
825 struct list_head *l, *tmp;
826 struct osc_cache_waiter *ocw;
829 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
830 /* if we can't dirty more, we must wait until some is written */
831 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
832 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
833 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
834 "osc max %ld, sys max %d\n", cli->cl_dirty,
835 cli->cl_dirty_max, obd_max_dirty_pages);
839 /* if still dirty cache but no grant wait for pending RPCs that
840 * may yet return us some grant before doing sync writes */
841 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
842 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
843 cli->cl_w_in_flight);
847 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
848 list_del_init(&ocw->ocw_entry);
849 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
850 /* no more RPCs in flight to return grant, do sync IO */
851 ocw->ocw_rc = -EDQUOT;
852 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
854 osc_consume_write_grant(cli,
855 &ocw->ocw_oap->oap_brw_page);
858 cfs_waitq_signal(&ocw->ocw_waitq);
864 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
866 client_obd_list_lock(&cli->cl_loi_list_lock);
867 cli->cl_avail_grant = ocd->ocd_grant;
868 client_obd_list_unlock(&cli->cl_loi_list_lock);
870 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
871 cli->cl_avail_grant, cli->cl_lost_grant);
872 LASSERT(cli->cl_avail_grant >= 0);
875 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
877 client_obd_list_lock(&cli->cl_loi_list_lock);
878 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
879 if (body->oa.o_valid & OBD_MD_FLGRANT)
880 cli->cl_avail_grant += body->oa.o_grant;
881 /* waiters are woken in brw_interpret_oap */
882 client_obd_list_unlock(&cli->cl_loi_list_lock);
885 /* We assume that the reason this OSC got a short read is because it read
886 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
887 * via the LOV, and it _knows_ it's reading inside the file, it's just that
888 * this stripe never got written at or beyond this stripe offset yet. */
889 static void handle_short_read(int nob_read, obd_count page_count,
890 struct brw_page **pga)
895 /* skip bytes read OK */
896 while (nob_read > 0) {
897 LASSERT (page_count > 0);
899 if (pga[i]->count > nob_read) {
900 /* EOF inside this page */
901 ptr = cfs_kmap(pga[i]->pg) +
902 (pga[i]->off & ~CFS_PAGE_MASK);
903 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
904 cfs_kunmap(pga[i]->pg);
910 nob_read -= pga[i]->count;
915 /* zero remaining pages */
916 while (page_count-- > 0) {
917 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
918 memset(ptr, 0, pga[i]->count);
919 cfs_kunmap(pga[i]->pg);
924 static int check_write_rcs(struct ptlrpc_request *req,
925 int requested_nob, int niocount,
926 obd_count page_count, struct brw_page **pga)
930 /* return error if any niobuf was in error */
931 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
932 sizeof(*remote_rcs) * niocount, NULL);
933 if (remote_rcs == NULL) {
934 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
937 if (lustre_msg_swabbed(req->rq_repmsg))
938 for (i = 0; i < niocount; i++)
939 __swab32s(&remote_rcs[i]);
941 for (i = 0; i < niocount; i++) {
942 if (remote_rcs[i] < 0)
943 return(remote_rcs[i]);
945 if (remote_rcs[i] != 0) {
946 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
947 i, remote_rcs[i], req);
952 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
953 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
954 requested_nob, req->rq_bulk->bd_nob_transferred);
961 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
963 if (p1->flag != p2->flag) {
964 unsigned mask = ~OBD_BRW_FROM_GRANT;
966 /* warn if we try to combine flags that we don't know to be
968 if ((p1->flag & mask) != (p2->flag & mask))
969 CERROR("is it ok to have flags 0x%x and 0x%x in the "
970 "same brw?\n", p1->flag, p2->flag);
974 return (p1->off + p1->count == p2->off);
977 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
978 struct brw_page **pga, int opc)
983 LASSERT (pg_count > 0);
984 while (nob > 0 && pg_count > 0) {
985 unsigned char *ptr = cfs_kmap(pga[i]->pg);
986 int off = pga[i]->off & ~CFS_PAGE_MASK;
987 int count = pga[i]->count > nob ? nob : pga[i]->count;
989 /* corrupt the data before we compute the checksum, to
990 * simulate an OST->client data error */
991 if (i == 0 && opc == OST_READ &&
992 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
993 memcpy(ptr + off, "bad1", min(4, nob));
994 cksum = crc32_le(cksum, ptr + off, count);
995 cfs_kunmap(pga[i]->pg);
996 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
999 nob -= pga[i]->count;
1003 /* For sending we only compute the wrong checksum instead
1004 * of corrupting the data so it is still correct on a redo */
1005 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1011 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1012 struct lov_stripe_md *lsm, obd_count page_count,
1013 struct brw_page **pga,
1014 struct ptlrpc_request **reqp,
1015 struct obd_capa *ocapa)
1017 struct ptlrpc_request *req;
1018 struct ptlrpc_bulk_desc *desc;
1019 struct ost_body *body;
1020 struct obd_ioobj *ioobj;
1021 struct niobuf_remote *niobuf;
1022 int niocount, i, requested_nob, opc, rc;
1023 struct osc_brw_async_args *aa;
1024 struct req_capsule *pill;
1027 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1028 RETURN(-ENOMEM); /* Recoverable */
1029 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1030 RETURN(-EINVAL); /* Fatal */
1032 if ((cmd & OBD_BRW_WRITE) != 0) {
1034 req = ptlrpc_request_alloc_pool(cli->cl_import,
1035 cli->cl_import->imp_rq_pool,
1039 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1045 for (niocount = i = 1; i < page_count; i++) {
1046 if (!can_merge_pages(pga[i - 1], pga[i]))
1050 pill = &req->rq_pill;
1051 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1052 niocount * sizeof(*niobuf));
1053 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1055 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1057 ptlrpc_request_free(req);
1060 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1062 if (opc == OST_WRITE)
1063 desc = ptlrpc_prep_bulk_imp(req, page_count,
1064 BULK_GET_SOURCE, OST_BULK_PORTAL);
1066 desc = ptlrpc_prep_bulk_imp(req, page_count,
1067 BULK_PUT_SINK, OST_BULK_PORTAL);
1070 GOTO(out, rc = -ENOMEM);
1071 /* NB request now owns desc and will free it when it gets freed */
1073 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1074 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1075 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1076 LASSERT(body && ioobj && niobuf);
1080 obdo_to_ioobj(oa, ioobj);
1081 ioobj->ioo_bufcnt = niocount;
1082 osc_pack_capa(req, body, ocapa);
1083 LASSERT (page_count > 0);
1084 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1085 struct brw_page *pg = pga[i];
1086 struct brw_page *pg_prev = pga[i - 1];
1088 LASSERT(pg->count > 0);
1089 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1090 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1091 pg->off, pg->count);
1093 LASSERTF(i == 0 || pg->off > pg_prev->off,
1094 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1095 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1097 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1098 pg_prev->pg, page_private(pg_prev->pg),
1099 pg_prev->pg->index, pg_prev->off);
1101 LASSERTF(i == 0 || pg->off > pg_prev->off,
1102 "i %d p_c %u\n", i, page_count);
1104 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1105 (pg->flag & OBD_BRW_SRVLOCK));
1107 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1109 requested_nob += pg->count;
1111 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1113 niobuf->len += pg->count;
1115 niobuf->offset = pg->off;
1116 niobuf->len = pg->count;
1117 niobuf->flags = pg->flag;
1121 LASSERT((void *)(niobuf - niocount) ==
1122 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1123 niocount * sizeof(*niobuf)));
1124 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1126 /* size[REQ_REC_OFF] still sizeof (*body) */
1127 if (opc == OST_WRITE) {
1128 if (unlikely(cli->cl_checksum) &&
1129 req->rq_flvr.sf_bulk_csum == BULK_CSUM_ALG_NULL) {
1130 body->oa.o_valid |= OBD_MD_FLCKSUM;
1131 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1134 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1136 /* save this in 'oa', too, for later checking */
1137 oa->o_valid |= OBD_MD_FLCKSUM;
1139 /* clear out the checksum flag, in case this is a
1140 * resend but cl_checksum is no longer set. b=11238 */
1141 oa->o_valid &= ~OBD_MD_FLCKSUM;
1143 oa->o_cksum = body->oa.o_cksum;
1144 /* 1 RC per niobuf */
1145 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1146 sizeof(__u32) * niocount);
1148 if (unlikely(cli->cl_checksum) &&
1149 req->rq_flvr.sf_bulk_csum == BULK_CSUM_ALG_NULL)
1150 body->oa.o_valid |= OBD_MD_FLCKSUM;
1151 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1152 /* 1 RC for the whole I/O */
1154 ptlrpc_request_set_replen(req);
1156 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1157 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1159 aa->aa_requested_nob = requested_nob;
1160 aa->aa_nio_count = niocount;
1161 aa->aa_page_count = page_count;
1165 INIT_LIST_HEAD(&aa->aa_oaps);
1171 ptlrpc_req_finished(req);
1175 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1176 __u32 client_cksum, __u32 server_cksum,
1177 int nob, obd_count page_count,
1178 struct brw_page **pga)
1183 if (server_cksum == client_cksum) {
1184 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1188 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1190 if (new_cksum == server_cksum)
1191 msg = "changed on the client after we checksummed it - "
1192 "likely false positive due to mmap IO (bug 11742)";
1193 else if (new_cksum == client_cksum)
1194 msg = "changed in transit before arrival at OST";
1196 msg = "changed in transit AND doesn't match the original - "
1197 "likely false positive due to mmap IO (bug 11742)";
1199 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1200 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1201 "["LPU64"-"LPU64"]\n",
1202 msg, libcfs_nid2str(peer->nid),
1203 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1204 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1207 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1209 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1210 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1211 client_cksum, server_cksum, new_cksum);
1215 /* Note rc enters this function as number of bytes transferred */
1216 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1218 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1219 const lnet_process_id_t *peer =
1220 &req->rq_import->imp_connection->c_peer;
1221 struct client_obd *cli = aa->aa_cli;
1222 struct ost_body *body;
1223 __u32 client_cksum = 0;
1226 if (rc < 0 && rc != -EDQUOT)
1229 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1230 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1231 lustre_swab_ost_body);
1233 CDEBUG(D_INFO, "Can't unpack body\n");
1237 /* set/clear over quota flag for a uid/gid */
1238 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1239 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1240 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1241 body->oa.o_gid, body->oa.o_valid,
1247 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1248 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1250 osc_update_grant(cli, body);
1252 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1254 CERROR("Unexpected +ve rc %d\n", rc);
1257 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1259 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1261 check_write_checksum(&body->oa, peer, client_cksum,
1263 aa->aa_requested_nob,
1268 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1271 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1272 aa->aa_page_count, aa->aa_ppga);
1276 /* The rest of this function executes only for OST_READs */
1277 if (rc > aa->aa_requested_nob) {
1278 CERROR("Unexpected rc %d (%d requested)\n", rc,
1279 aa->aa_requested_nob);
1283 if (rc != req->rq_bulk->bd_nob_transferred) {
1284 CERROR ("Unexpected rc %d (%d transferred)\n",
1285 rc, req->rq_bulk->bd_nob_transferred);
1289 if (rc < aa->aa_requested_nob)
1290 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1292 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1294 GOTO(out, rc = -EAGAIN);
1296 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1297 static int cksum_counter;
1298 __u32 server_cksum = body->oa.o_cksum;
1302 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1303 aa->aa_ppga, OST_READ);
1305 if (peer->nid == req->rq_bulk->bd_sender) {
1309 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1312 if (server_cksum == ~0 && rc > 0) {
1313 CERROR("Protocol error: server %s set the 'checksum' "
1314 "bit, but didn't send a checksum. Not fatal, "
1315 "but please tell CFS.\n",
1316 libcfs_nid2str(peer->nid));
1317 } else if (server_cksum != client_cksum) {
1318 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1319 "%s%s%s inum "LPU64"/"LPU64" object "
1320 LPU64"/"LPU64" extent "
1321 "["LPU64"-"LPU64"]\n",
1322 req->rq_import->imp_obd->obd_name,
1323 libcfs_nid2str(peer->nid),
1325 body->oa.o_valid & OBD_MD_FLFID ?
1326 body->oa.o_fid : (__u64)0,
1327 body->oa.o_valid & OBD_MD_FLFID ?
1328 body->oa.o_generation :(__u64)0,
1330 body->oa.o_valid & OBD_MD_FLGROUP ?
1331 body->oa.o_gr : (__u64)0,
1332 aa->aa_ppga[0]->off,
1333 aa->aa_ppga[aa->aa_page_count-1]->off +
1334 aa->aa_ppga[aa->aa_page_count-1]->count -
1336 CERROR("client %x, server %x\n",
1337 client_cksum, server_cksum);
1339 aa->aa_oa->o_cksum = client_cksum;
1343 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1346 } else if (unlikely(client_cksum)) {
1347 static int cksum_missed;
1350 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1351 CERROR("Checksum %u requested from %s but not sent\n",
1352 cksum_missed, libcfs_nid2str(peer->nid));
1358 *aa->aa_oa = body->oa;
1363 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1364 struct lov_stripe_md *lsm,
1365 obd_count page_count, struct brw_page **pga,
1366 struct obd_capa *ocapa)
1368 struct ptlrpc_request *req;
1372 struct l_wait_info lwi;
1376 cfs_waitq_init(&waitq);
1379 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1380 page_count, pga, &req, ocapa);
1384 rc = ptlrpc_queue_wait(req);
1386 if (rc == -ETIMEDOUT && req->rq_resend) {
1387 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1388 ptlrpc_req_finished(req);
1392 rc = osc_brw_fini_request(req, rc);
1394 ptlrpc_req_finished(req);
1395 if (osc_recoverable_error(rc)) {
1397 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1398 CERROR("too many resend retries, returning error\n");
1402 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1403 l_wait_event(waitq, 0, &lwi);
1411 int osc_brw_redo_request(struct ptlrpc_request *request,
1412 struct osc_brw_async_args *aa)
1414 struct ptlrpc_request *new_req;
1415 struct ptlrpc_request_set *set = request->rq_set;
1416 struct osc_brw_async_args *new_aa;
1417 struct osc_async_page *oap;
1421 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1422 CERROR("too many resend retries, returning error\n");
1426 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1428 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1429 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1430 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1433 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1434 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1435 aa->aa_cli, aa->aa_oa,
1436 NULL /* lsm unused by osc currently */,
1437 aa->aa_page_count, aa->aa_ppga,
1438 &new_req, NULL /* ocapa */);
1442 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1444 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1445 if (oap->oap_request != NULL) {
1446 LASSERTF(request == oap->oap_request,
1447 "request %p != oap_request %p\n",
1448 request, oap->oap_request);
1449 if (oap->oap_interrupted) {
1450 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1451 ptlrpc_req_finished(new_req);
1456 /* New request takes over pga and oaps from old request.
1457 * Note that copying a list_head doesn't work, need to move it... */
1459 new_req->rq_interpret_reply = request->rq_interpret_reply;
1460 new_req->rq_async_args = request->rq_async_args;
1461 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1463 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1465 INIT_LIST_HEAD(&new_aa->aa_oaps);
1466 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1467 INIT_LIST_HEAD(&aa->aa_oaps);
1469 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1470 if (oap->oap_request) {
1471 ptlrpc_req_finished(oap->oap_request);
1472 oap->oap_request = ptlrpc_request_addref(new_req);
1476 /* use ptlrpc_set_add_req is safe because interpret functions work
1477 * in check_set context. only one way exist with access to request
1478 * from different thread got -EINTR - this way protected with
1479 * cl_loi_list_lock */
1480 ptlrpc_set_add_req(set, new_req);
1482 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1484 DEBUG_REQ(D_INFO, new_req, "new request");
1488 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1490 struct osc_brw_async_args *aa = data;
1494 rc = osc_brw_fini_request(req, rc);
1495 if (osc_recoverable_error(rc)) {
1496 rc = osc_brw_redo_request(req, aa);
1501 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1502 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1503 aa->aa_cli->cl_w_in_flight--;
1505 aa->aa_cli->cl_r_in_flight--;
1506 for (i = 0; i < aa->aa_page_count; i++)
1507 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1508 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1510 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1515 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1516 struct lov_stripe_md *lsm, obd_count page_count,
1517 struct brw_page **pga, struct ptlrpc_request_set *set,
1518 struct obd_capa *ocapa)
1520 struct ptlrpc_request *req;
1521 struct client_obd *cli = &exp->exp_obd->u.cli;
1523 struct osc_brw_async_args *aa;
1526 /* Consume write credits even if doing a sync write -
1527 * otherwise we may run out of space on OST due to grant. */
1528 if (cmd == OBD_BRW_WRITE) {
1529 spin_lock(&cli->cl_loi_list_lock);
1530 for (i = 0; i < page_count; i++) {
1531 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1532 osc_consume_write_grant(cli, pga[i]);
1534 spin_unlock(&cli->cl_loi_list_lock);
1537 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1540 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1541 if (cmd == OBD_BRW_READ) {
1542 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1543 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1544 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1546 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1547 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1548 cli->cl_w_in_flight);
1549 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1553 req->rq_interpret_reply = brw_interpret;
1554 ptlrpc_set_add_req(set, req);
1555 client_obd_list_lock(&cli->cl_loi_list_lock);
1556 if (cmd == OBD_BRW_READ)
1557 cli->cl_r_in_flight++;
1559 cli->cl_w_in_flight++;
1560 client_obd_list_unlock(&cli->cl_loi_list_lock);
1561 } else if (cmd == OBD_BRW_WRITE) {
1562 client_obd_list_lock(&cli->cl_loi_list_lock);
1563 for (i = 0; i < page_count; i++)
1564 osc_release_write_grant(cli, pga[i], 0);
1565 client_obd_list_unlock(&cli->cl_loi_list_lock);
1571 * ugh, we want disk allocation on the target to happen in offset order. we'll
1572 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1573 * fine for our small page arrays and doesn't require allocation. its an
1574 * insertion sort that swaps elements that are strides apart, shrinking the
1575 * stride down until its '1' and the array is sorted.
1577 static void sort_brw_pages(struct brw_page **array, int num)
1580 struct brw_page *tmp;
1584 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1589 for (i = stride ; i < num ; i++) {
1592 while (j >= stride && array[j - stride]->off > tmp->off) {
1593 array[j] = array[j - stride];
1598 } while (stride > 1);
1601 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1607 LASSERT (pages > 0);
1608 offset = pg[i]->off & ~CFS_PAGE_MASK;
1612 if (pages == 0) /* that's all */
1615 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1616 return count; /* doesn't end on page boundary */
1619 offset = pg[i]->off & ~CFS_PAGE_MASK;
1620 if (offset != 0) /* doesn't start on page boundary */
1627 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1629 struct brw_page **ppga;
1632 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1636 for (i = 0; i < count; i++)
1641 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1643 LASSERT(ppga != NULL);
1644 OBD_FREE(ppga, sizeof(*ppga) * count);
1647 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1648 obd_count page_count, struct brw_page *pga,
1649 struct obd_trans_info *oti)
1651 struct obdo *saved_oa = NULL;
1652 struct brw_page **ppga, **orig;
1653 struct obd_import *imp = class_exp2cliimp(exp);
1654 struct client_obd *cli = &imp->imp_obd->u.cli;
1655 int rc, page_count_orig;
1658 if (cmd & OBD_BRW_CHECK) {
1659 /* The caller just wants to know if there's a chance that this
1660 * I/O can succeed */
1662 if (imp == NULL || imp->imp_invalid)
1667 /* test_brw with a failed create can trip this, maybe others. */
1668 LASSERT(cli->cl_max_pages_per_rpc);
1672 orig = ppga = osc_build_ppga(pga, page_count);
1675 page_count_orig = page_count;
1677 sort_brw_pages(ppga, page_count);
1678 while (page_count) {
1679 obd_count pages_per_brw;
1681 if (page_count > cli->cl_max_pages_per_rpc)
1682 pages_per_brw = cli->cl_max_pages_per_rpc;
1684 pages_per_brw = page_count;
1686 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1688 if (saved_oa != NULL) {
1689 /* restore previously saved oa */
1690 *oinfo->oi_oa = *saved_oa;
1691 } else if (page_count > pages_per_brw) {
1692 /* save a copy of oa (brw will clobber it) */
1693 OBDO_ALLOC(saved_oa);
1694 if (saved_oa == NULL)
1695 GOTO(out, rc = -ENOMEM);
1696 *saved_oa = *oinfo->oi_oa;
1699 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1700 pages_per_brw, ppga, oinfo->oi_capa);
1705 page_count -= pages_per_brw;
1706 ppga += pages_per_brw;
1710 osc_release_ppga(orig, page_count_orig);
1712 if (saved_oa != NULL)
1713 OBDO_FREE(saved_oa);
1718 static int osc_brw_async(int cmd, struct obd_export *exp,
1719 struct obd_info *oinfo, obd_count page_count,
1720 struct brw_page *pga, struct obd_trans_info *oti,
1721 struct ptlrpc_request_set *set)
1723 struct brw_page **ppga, **orig;
1724 struct client_obd *cli = &exp->exp_obd->u.cli;
1725 int page_count_orig;
1729 if (cmd & OBD_BRW_CHECK) {
1730 struct obd_import *imp = class_exp2cliimp(exp);
1731 /* The caller just wants to know if there's a chance that this
1732 * I/O can succeed */
1734 if (imp == NULL || imp->imp_invalid)
1739 orig = ppga = osc_build_ppga(pga, page_count);
1742 page_count_orig = page_count;
1744 sort_brw_pages(ppga, page_count);
1745 while (page_count) {
1746 struct brw_page **copy;
1747 obd_count pages_per_brw;
1749 pages_per_brw = min_t(obd_count, page_count,
1750 cli->cl_max_pages_per_rpc);
1752 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1754 /* use ppga only if single RPC is going to fly */
1755 if (pages_per_brw != page_count_orig || ppga != orig) {
1756 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1758 GOTO(out, rc = -ENOMEM);
1759 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1763 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1764 pages_per_brw, copy, set, oinfo->oi_capa);
1768 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1772 /* we passed it to async_internal() which is
1773 * now responsible for releasing memory */
1777 page_count -= pages_per_brw;
1778 ppga += pages_per_brw;
1782 osc_release_ppga(orig, page_count_orig);
1786 static void osc_check_rpcs(struct client_obd *cli);
1788 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1789 * the dirty accounting. Writeback completes or truncate happens before
1790 * writing starts. Must be called with the loi lock held. */
1791 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1794 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1798 /* This maintains the lists of pending pages to read/write for a given object
1799 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1800 * to quickly find objects that are ready to send an RPC. */
1801 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1807 if (lop->lop_num_pending == 0)
1810 /* if we have an invalid import we want to drain the queued pages
1811 * by forcing them through rpcs that immediately fail and complete
1812 * the pages. recovery relies on this to empty the queued pages
1813 * before canceling the locks and evicting down the llite pages */
1814 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1817 /* stream rpcs in queue order as long as as there is an urgent page
1818 * queued. this is our cheap solution for good batching in the case
1819 * where writepage marks some random page in the middle of the file
1820 * as urgent because of, say, memory pressure */
1821 if (!list_empty(&lop->lop_urgent)) {
1822 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1825 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1826 optimal = cli->cl_max_pages_per_rpc;
1827 if (cmd & OBD_BRW_WRITE) {
1828 /* trigger a write rpc stream as long as there are dirtiers
1829 * waiting for space. as they're waiting, they're not going to
1830 * create more pages to coallesce with what's waiting.. */
1831 if (!list_empty(&cli->cl_cache_waiters)) {
1832 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1835 /* +16 to avoid triggering rpcs that would want to include pages
1836 * that are being queued but which can't be made ready until
1837 * the queuer finishes with the page. this is a wart for
1838 * llite::commit_write() */
1841 if (lop->lop_num_pending >= optimal)
1847 static void on_list(struct list_head *item, struct list_head *list,
1850 if (list_empty(item) && should_be_on)
1851 list_add_tail(item, list);
1852 else if (!list_empty(item) && !should_be_on)
1853 list_del_init(item);
1856 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1857 * can find pages to build into rpcs quickly */
1858 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1860 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1861 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1862 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1864 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1865 loi->loi_write_lop.lop_num_pending);
1867 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1868 loi->loi_read_lop.lop_num_pending);
1871 static void lop_update_pending(struct client_obd *cli,
1872 struct loi_oap_pages *lop, int cmd, int delta)
1874 lop->lop_num_pending += delta;
1875 if (cmd & OBD_BRW_WRITE)
1876 cli->cl_pending_w_pages += delta;
1878 cli->cl_pending_r_pages += delta;
1881 /* this is called when a sync waiter receives an interruption. Its job is to
1882 * get the caller woken as soon as possible. If its page hasn't been put in an
1883 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1884 * desiring interruption which will forcefully complete the rpc once the rpc
1886 static void osc_occ_interrupted(struct oig_callback_context *occ)
1888 struct osc_async_page *oap;
1889 struct loi_oap_pages *lop;
1890 struct lov_oinfo *loi;
1893 /* XXX member_of() */
1894 oap = list_entry(occ, struct osc_async_page, oap_occ);
1896 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1898 oap->oap_interrupted = 1;
1900 /* ok, it's been put in an rpc. only one oap gets a request reference */
1901 if (oap->oap_request != NULL) {
1902 ptlrpc_mark_interrupted(oap->oap_request);
1903 ptlrpcd_wake(oap->oap_request);
1907 /* we don't get interruption callbacks until osc_trigger_group_io()
1908 * has been called and put the sync oaps in the pending/urgent lists.*/
1909 if (!list_empty(&oap->oap_pending_item)) {
1910 list_del_init(&oap->oap_pending_item);
1911 list_del_init(&oap->oap_urgent_item);
1914 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1915 &loi->loi_write_lop : &loi->loi_read_lop;
1916 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1917 loi_list_maint(oap->oap_cli, oap->oap_loi);
1919 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1920 oap->oap_oig = NULL;
1924 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1927 /* this is trying to propogate async writeback errors back up to the
1928 * application. As an async write fails we record the error code for later if
1929 * the app does an fsync. As long as errors persist we force future rpcs to be
1930 * sync so that the app can get a sync error and break the cycle of queueing
1931 * pages for which writeback will fail. */
1932 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1939 ar->ar_force_sync = 1;
1940 ar->ar_min_xid = ptlrpc_sample_next_xid();
1945 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1946 ar->ar_force_sync = 0;
1949 static void osc_oap_to_pending(struct osc_async_page *oap)
1951 struct loi_oap_pages *lop;
1953 if (oap->oap_cmd & OBD_BRW_WRITE)
1954 lop = &oap->oap_loi->loi_write_lop;
1956 lop = &oap->oap_loi->loi_read_lop;
1958 if (oap->oap_async_flags & ASYNC_URGENT)
1959 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1960 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1961 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1964 /* this must be called holding the loi list lock to give coverage to exit_cache,
1965 * async_flag maintenance, and oap_request */
1966 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1967 struct osc_async_page *oap, int sent, int rc)
1972 if (oap->oap_request != NULL) {
1973 xid = ptlrpc_req_xid(oap->oap_request);
1974 ptlrpc_req_finished(oap->oap_request);
1975 oap->oap_request = NULL;
1978 oap->oap_async_flags = 0;
1979 oap->oap_interrupted = 0;
1981 if (oap->oap_cmd & OBD_BRW_WRITE) {
1982 osc_process_ar(&cli->cl_ar, xid, rc);
1983 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1986 if (rc == 0 && oa != NULL) {
1987 if (oa->o_valid & OBD_MD_FLBLOCKS)
1988 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1989 if (oa->o_valid & OBD_MD_FLMTIME)
1990 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1991 if (oa->o_valid & OBD_MD_FLATIME)
1992 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1993 if (oa->o_valid & OBD_MD_FLCTIME)
1994 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1998 osc_exit_cache(cli, oap, sent);
1999 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2000 oap->oap_oig = NULL;
2005 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2006 oap->oap_cmd, oa, rc);
2008 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2009 * I/O on the page could start, but OSC calls it under lock
2010 * and thus we can add oap back to pending safely */
2012 /* upper layer wants to leave the page on pending queue */
2013 osc_oap_to_pending(oap);
2015 osc_exit_cache(cli, oap, sent);
2019 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2021 struct osc_async_page *oap, *tmp;
2022 struct osc_brw_async_args *aa = data;
2023 struct client_obd *cli;
2026 rc = osc_brw_fini_request(req, rc);
2027 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2028 if (osc_recoverable_error(rc)) {
2029 rc = osc_brw_redo_request(req, aa);
2036 client_obd_list_lock(&cli->cl_loi_list_lock);
2038 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2039 * is called so we know whether to go to sync BRWs or wait for more
2040 * RPCs to complete */
2041 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2042 cli->cl_w_in_flight--;
2044 cli->cl_r_in_flight--;
2046 /* the caller may re-use the oap after the completion call so
2047 * we need to clean it up a little */
2048 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2049 list_del_init(&oap->oap_rpc_item);
2050 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2053 osc_wake_cache_waiters(cli);
2054 osc_check_rpcs(cli);
2056 client_obd_list_unlock(&cli->cl_loi_list_lock);
2058 OBDO_FREE(aa->aa_oa);
2060 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2064 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2065 struct list_head *rpc_list,
2066 int page_count, int cmd)
2068 struct ptlrpc_request *req;
2069 struct brw_page **pga = NULL;
2070 struct osc_brw_async_args *aa;
2071 struct obdo *oa = NULL;
2072 struct obd_async_page_ops *ops = NULL;
2073 void *caller_data = NULL;
2074 struct obd_capa *ocapa;
2075 struct osc_async_page *oap;
2079 LASSERT(!list_empty(rpc_list));
2081 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2083 RETURN(ERR_PTR(-ENOMEM));
2087 GOTO(out, req = ERR_PTR(-ENOMEM));
2090 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2092 ops = oap->oap_caller_ops;
2093 caller_data = oap->oap_caller_data;
2095 pga[i] = &oap->oap_brw_page;
2096 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2097 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2098 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2102 /* always get the data for the obdo for the rpc */
2103 LASSERT(ops != NULL);
2104 ops->ap_fill_obdo(caller_data, cmd, oa);
2105 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2107 sort_brw_pages(pga, page_count);
2108 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2112 CERROR("prep_req failed: %d\n", rc);
2113 GOTO(out, req = ERR_PTR(rc));
2116 /* Need to update the timestamps after the request is built in case
2117 * we race with setattr (locally or in queue at OST). If OST gets
2118 * later setattr before earlier BRW (as determined by the request xid),
2119 * the OST will not use BRW timestamps. Sadly, there is no obvious
2120 * way to do this in a single call. bug 10150 */
2121 ops->ap_update_obdo(caller_data, cmd, oa,
2122 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2124 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2125 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2126 INIT_LIST_HEAD(&aa->aa_oaps);
2127 list_splice(rpc_list, &aa->aa_oaps);
2128 INIT_LIST_HEAD(rpc_list);
2135 OBD_FREE(pga, sizeof(*pga) * page_count);
2140 /* the loi lock is held across this function but it's allowed to release
2141 * and reacquire it during its work */
2142 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2143 int cmd, struct loi_oap_pages *lop)
2145 struct ptlrpc_request *req;
2146 obd_count page_count = 0;
2147 struct osc_async_page *oap = NULL, *tmp;
2148 struct osc_brw_async_args *aa;
2149 struct obd_async_page_ops *ops;
2150 CFS_LIST_HEAD(rpc_list);
2151 unsigned int ending_offset;
2152 unsigned starting_offset = 0;
2155 /* first we find the pages we're allowed to work with */
2156 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2158 ops = oap->oap_caller_ops;
2160 LASSERT(oap->oap_magic == OAP_MAGIC);
2162 /* in llite being 'ready' equates to the page being locked
2163 * until completion unlocks it. commit_write submits a page
2164 * as not ready because its unlock will happen unconditionally
2165 * as the call returns. if we race with commit_write giving
2166 * us that page we dont' want to create a hole in the page
2167 * stream, so we stop and leave the rpc to be fired by
2168 * another dirtier or kupdated interval (the not ready page
2169 * will still be on the dirty list). we could call in
2170 * at the end of ll_file_write to process the queue again. */
2171 if (!(oap->oap_async_flags & ASYNC_READY)) {
2172 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2174 CDEBUG(D_INODE, "oap %p page %p returned %d "
2175 "instead of ready\n", oap,
2179 /* llite is telling us that the page is still
2180 * in commit_write and that we should try
2181 * and put it in an rpc again later. we
2182 * break out of the loop so we don't create
2183 * a hole in the sequence of pages in the rpc
2188 /* the io isn't needed.. tell the checks
2189 * below to complete the rpc with EINTR */
2190 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2191 oap->oap_count = -EINTR;
2194 oap->oap_async_flags |= ASYNC_READY;
2197 LASSERTF(0, "oap %p page %p returned %d "
2198 "from make_ready\n", oap,
2206 * Page submitted for IO has to be locked. Either by
2207 * ->ap_make_ready() or by higher layers.
2209 * XXX nikita: this assertion should be adjusted when lustre
2210 * starts using PG_writeback for pages being written out.
2212 #if defined(__KERNEL__) && defined(__linux__)
2213 LASSERT(PageLocked(oap->oap_page));
2215 /* If there is a gap at the start of this page, it can't merge
2216 * with any previous page, so we'll hand the network a
2217 * "fragmented" page array that it can't transfer in 1 RDMA */
2218 if (page_count != 0 && oap->oap_page_off != 0)
2221 /* take the page out of our book-keeping */
2222 list_del_init(&oap->oap_pending_item);
2223 lop_update_pending(cli, lop, cmd, -1);
2224 list_del_init(&oap->oap_urgent_item);
2226 if (page_count == 0)
2227 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2228 (PTLRPC_MAX_BRW_SIZE - 1);
2230 /* ask the caller for the size of the io as the rpc leaves. */
2231 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2233 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2234 if (oap->oap_count <= 0) {
2235 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2237 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2241 /* now put the page back in our accounting */
2242 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2243 if (++page_count >= cli->cl_max_pages_per_rpc)
2246 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2247 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2248 * have the same alignment as the initial writes that allocated
2249 * extents on the server. */
2250 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2251 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2252 if (ending_offset == 0)
2255 /* If there is a gap at the end of this page, it can't merge
2256 * with any subsequent pages, so we'll hand the network a
2257 * "fragmented" page array that it can't transfer in 1 RDMA */
2258 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2262 osc_wake_cache_waiters(cli);
2264 if (page_count == 0)
2267 loi_list_maint(cli, loi);
2269 client_obd_list_unlock(&cli->cl_loi_list_lock);
2271 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2273 /* this should happen rarely and is pretty bad, it makes the
2274 * pending list not follow the dirty order */
2275 client_obd_list_lock(&cli->cl_loi_list_lock);
2276 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2277 list_del_init(&oap->oap_rpc_item);
2279 /* queued sync pages can be torn down while the pages
2280 * were between the pending list and the rpc */
2281 if (oap->oap_interrupted) {
2282 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2283 osc_ap_completion(cli, NULL, oap, 0,
2287 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2289 loi_list_maint(cli, loi);
2290 RETURN(PTR_ERR(req));
2293 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2295 if (cmd == OBD_BRW_READ) {
2296 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2297 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2298 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2299 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2300 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2302 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2303 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2304 cli->cl_w_in_flight);
2305 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2306 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2307 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2310 client_obd_list_lock(&cli->cl_loi_list_lock);
2312 if (cmd == OBD_BRW_READ)
2313 cli->cl_r_in_flight++;
2315 cli->cl_w_in_flight++;
2317 /* queued sync pages can be torn down while the pages
2318 * were between the pending list and the rpc */
2320 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2321 /* only one oap gets a request reference */
2324 if (oap->oap_interrupted && !req->rq_intr) {
2325 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2327 ptlrpc_mark_interrupted(req);
2331 tmp->oap_request = ptlrpc_request_addref(req);
2333 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2334 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2336 req->rq_interpret_reply = brw_interpret_oap;
2337 ptlrpcd_add_req(req);
2341 #define LOI_DEBUG(LOI, STR, args...) \
2342 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2343 !list_empty(&(LOI)->loi_cli_item), \
2344 (LOI)->loi_write_lop.lop_num_pending, \
2345 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2346 (LOI)->loi_read_lop.lop_num_pending, \
2347 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2350 /* This is called by osc_check_rpcs() to find which objects have pages that
2351 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2352 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2355 /* first return all objects which we already know to have
2356 * pages ready to be stuffed into rpcs */
2357 if (!list_empty(&cli->cl_loi_ready_list))
2358 RETURN(list_entry(cli->cl_loi_ready_list.next,
2359 struct lov_oinfo, loi_cli_item));
2361 /* then if we have cache waiters, return all objects with queued
2362 * writes. This is especially important when many small files
2363 * have filled up the cache and not been fired into rpcs because
2364 * they don't pass the nr_pending/object threshhold */
2365 if (!list_empty(&cli->cl_cache_waiters) &&
2366 !list_empty(&cli->cl_loi_write_list))
2367 RETURN(list_entry(cli->cl_loi_write_list.next,
2368 struct lov_oinfo, loi_write_item));
2370 /* then return all queued objects when we have an invalid import
2371 * so that they get flushed */
2372 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2373 if (!list_empty(&cli->cl_loi_write_list))
2374 RETURN(list_entry(cli->cl_loi_write_list.next,
2375 struct lov_oinfo, loi_write_item));
2376 if (!list_empty(&cli->cl_loi_read_list))
2377 RETURN(list_entry(cli->cl_loi_read_list.next,
2378 struct lov_oinfo, loi_read_item));
2383 /* called with the loi list lock held */
2384 static void osc_check_rpcs(struct client_obd *cli)
2386 struct lov_oinfo *loi;
2387 int rc = 0, race_counter = 0;
2390 while ((loi = osc_next_loi(cli)) != NULL) {
2391 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2393 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2396 /* attempt some read/write balancing by alternating between
2397 * reads and writes in an object. The makes_rpc checks here
2398 * would be redundant if we were getting read/write work items
2399 * instead of objects. we don't want send_oap_rpc to drain a
2400 * partial read pending queue when we're given this object to
2401 * do io on writes while there are cache waiters */
2402 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2403 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2404 &loi->loi_write_lop);
2412 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2413 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2414 &loi->loi_read_lop);
2423 /* attempt some inter-object balancing by issueing rpcs
2424 * for each object in turn */
2425 if (!list_empty(&loi->loi_cli_item))
2426 list_del_init(&loi->loi_cli_item);
2427 if (!list_empty(&loi->loi_write_item))
2428 list_del_init(&loi->loi_write_item);
2429 if (!list_empty(&loi->loi_read_item))
2430 list_del_init(&loi->loi_read_item);
2432 loi_list_maint(cli, loi);
2434 /* send_oap_rpc fails with 0 when make_ready tells it to
2435 * back off. llite's make_ready does this when it tries
2436 * to lock a page queued for write that is already locked.
2437 * we want to try sending rpcs from many objects, but we
2438 * don't want to spin failing with 0. */
2439 if (race_counter == 10)
2445 /* we're trying to queue a page in the osc so we're subject to the
2446 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2447 * If the osc's queued pages are already at that limit, then we want to sleep
2448 * until there is space in the osc's queue for us. We also may be waiting for
2449 * write credits from the OST if there are RPCs in flight that may return some
2450 * before we fall back to sync writes.
2452 * We need this know our allocation was granted in the presence of signals */
2453 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2457 client_obd_list_lock(&cli->cl_loi_list_lock);
2458 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2459 client_obd_list_unlock(&cli->cl_loi_list_lock);
2463 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2464 * grant or cache space. */
2465 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2466 struct osc_async_page *oap)
2468 struct osc_cache_waiter ocw;
2469 struct l_wait_info lwi = { 0 };
2473 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2474 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2475 cli->cl_dirty_max, obd_max_dirty_pages,
2476 cli->cl_lost_grant, cli->cl_avail_grant);
2478 /* force the caller to try sync io. this can jump the list
2479 * of queued writes and create a discontiguous rpc stream */
2480 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2481 loi->loi_ar.ar_force_sync)
2484 /* Hopefully normal case - cache space and write credits available */
2485 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2486 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2487 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2488 /* account for ourselves */
2489 osc_consume_write_grant(cli, &oap->oap_brw_page);
2493 /* Make sure that there are write rpcs in flight to wait for. This
2494 * is a little silly as this object may not have any pending but
2495 * other objects sure might. */
2496 if (cli->cl_w_in_flight) {
2497 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2498 cfs_waitq_init(&ocw.ocw_waitq);
2502 loi_list_maint(cli, loi);
2503 osc_check_rpcs(cli);
2504 client_obd_list_unlock(&cli->cl_loi_list_lock);
2506 CDEBUG(D_CACHE, "sleeping for cache space\n");
2507 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2509 client_obd_list_lock(&cli->cl_loi_list_lock);
2510 if (!list_empty(&ocw.ocw_entry)) {
2511 list_del(&ocw.ocw_entry);
2520 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2521 struct lov_oinfo *loi, cfs_page_t *page,
2522 obd_off offset, struct obd_async_page_ops *ops,
2523 void *data, void **res)
2525 struct osc_async_page *oap;
2529 return size_round(sizeof(*oap));
2532 oap->oap_magic = OAP_MAGIC;
2533 oap->oap_cli = &exp->exp_obd->u.cli;
2536 oap->oap_caller_ops = ops;
2537 oap->oap_caller_data = data;
2539 oap->oap_page = page;
2540 oap->oap_obj_off = offset;
2542 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2543 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2544 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2546 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2548 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2552 struct osc_async_page *oap_from_cookie(void *cookie)
2554 struct osc_async_page *oap = cookie;
2555 if (oap->oap_magic != OAP_MAGIC)
2556 return ERR_PTR(-EINVAL);
2560 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2561 struct lov_oinfo *loi, void *cookie,
2562 int cmd, obd_off off, int count,
2563 obd_flag brw_flags, enum async_flags async_flags)
2565 struct client_obd *cli = &exp->exp_obd->u.cli;
2566 struct osc_async_page *oap;
2570 oap = oap_from_cookie(cookie);
2572 RETURN(PTR_ERR(oap));
2574 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2577 if (!list_empty(&oap->oap_pending_item) ||
2578 !list_empty(&oap->oap_urgent_item) ||
2579 !list_empty(&oap->oap_rpc_item))
2582 /* check if the file's owner/group is over quota */
2583 #ifdef HAVE_QUOTA_SUPPORT
2584 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2585 struct obd_async_page_ops *ops;
2592 ops = oap->oap_caller_ops;
2593 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2594 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2605 loi = lsm->lsm_oinfo[0];
2607 client_obd_list_lock(&cli->cl_loi_list_lock);
2610 oap->oap_page_off = off;
2611 oap->oap_count = count;
2612 oap->oap_brw_flags = brw_flags;
2613 oap->oap_async_flags = async_flags;
2615 if (cmd & OBD_BRW_WRITE) {
2616 rc = osc_enter_cache(cli, loi, oap);
2618 client_obd_list_unlock(&cli->cl_loi_list_lock);
2623 osc_oap_to_pending(oap);
2624 loi_list_maint(cli, loi);
2626 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2629 osc_check_rpcs(cli);
2630 client_obd_list_unlock(&cli->cl_loi_list_lock);
2635 /* aka (~was & now & flag), but this is more clear :) */
2636 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2638 static int osc_set_async_flags(struct obd_export *exp,
2639 struct lov_stripe_md *lsm,
2640 struct lov_oinfo *loi, void *cookie,
2641 obd_flag async_flags)
2643 struct client_obd *cli = &exp->exp_obd->u.cli;
2644 struct loi_oap_pages *lop;
2645 struct osc_async_page *oap;
2649 oap = oap_from_cookie(cookie);
2651 RETURN(PTR_ERR(oap));
2654 * bug 7311: OST-side locking is only supported for liblustre for now
2655 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2656 * implementation has to handle case where OST-locked page was picked
2657 * up by, e.g., ->writepage().
2659 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2660 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2663 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2667 loi = lsm->lsm_oinfo[0];
2669 if (oap->oap_cmd & OBD_BRW_WRITE) {
2670 lop = &loi->loi_write_lop;
2672 lop = &loi->loi_read_lop;
2675 client_obd_list_lock(&cli->cl_loi_list_lock);
2677 if (list_empty(&oap->oap_pending_item))
2678 GOTO(out, rc = -EINVAL);
2680 if ((oap->oap_async_flags & async_flags) == async_flags)
2683 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2684 oap->oap_async_flags |= ASYNC_READY;
2686 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2687 if (list_empty(&oap->oap_rpc_item)) {
2688 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2689 loi_list_maint(cli, loi);
2693 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2694 oap->oap_async_flags);
2696 osc_check_rpcs(cli);
2697 client_obd_list_unlock(&cli->cl_loi_list_lock);
2701 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2702 struct lov_oinfo *loi,
2703 struct obd_io_group *oig, void *cookie,
2704 int cmd, obd_off off, int count,
2706 obd_flag async_flags)
2708 struct client_obd *cli = &exp->exp_obd->u.cli;
2709 struct osc_async_page *oap;
2710 struct loi_oap_pages *lop;
2714 oap = oap_from_cookie(cookie);
2716 RETURN(PTR_ERR(oap));
2718 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2721 if (!list_empty(&oap->oap_pending_item) ||
2722 !list_empty(&oap->oap_urgent_item) ||
2723 !list_empty(&oap->oap_rpc_item))
2727 loi = lsm->lsm_oinfo[0];
2729 client_obd_list_lock(&cli->cl_loi_list_lock);
2732 oap->oap_page_off = off;
2733 oap->oap_count = count;
2734 oap->oap_brw_flags = brw_flags;
2735 oap->oap_async_flags = async_flags;
2737 if (cmd & OBD_BRW_WRITE)
2738 lop = &loi->loi_write_lop;
2740 lop = &loi->loi_read_lop;
2742 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2743 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2745 rc = oig_add_one(oig, &oap->oap_occ);
2748 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2749 oap, oap->oap_page, rc);
2751 client_obd_list_unlock(&cli->cl_loi_list_lock);
2756 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2757 struct loi_oap_pages *lop, int cmd)
2759 struct list_head *pos, *tmp;
2760 struct osc_async_page *oap;
2762 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2763 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2764 list_del(&oap->oap_pending_item);
2765 osc_oap_to_pending(oap);
2767 loi_list_maint(cli, loi);
2770 static int osc_trigger_group_io(struct obd_export *exp,
2771 struct lov_stripe_md *lsm,
2772 struct lov_oinfo *loi,
2773 struct obd_io_group *oig)
2775 struct client_obd *cli = &exp->exp_obd->u.cli;
2779 loi = lsm->lsm_oinfo[0];
2781 client_obd_list_lock(&cli->cl_loi_list_lock);
2783 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2784 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2786 osc_check_rpcs(cli);
2787 client_obd_list_unlock(&cli->cl_loi_list_lock);
2792 static int osc_teardown_async_page(struct obd_export *exp,
2793 struct lov_stripe_md *lsm,
2794 struct lov_oinfo *loi, void *cookie)
2796 struct client_obd *cli = &exp->exp_obd->u.cli;
2797 struct loi_oap_pages *lop;
2798 struct osc_async_page *oap;
2802 oap = oap_from_cookie(cookie);
2804 RETURN(PTR_ERR(oap));
2807 loi = lsm->lsm_oinfo[0];
2809 if (oap->oap_cmd & OBD_BRW_WRITE) {
2810 lop = &loi->loi_write_lop;
2812 lop = &loi->loi_read_lop;
2815 client_obd_list_lock(&cli->cl_loi_list_lock);
2817 if (!list_empty(&oap->oap_rpc_item))
2818 GOTO(out, rc = -EBUSY);
2820 osc_exit_cache(cli, oap, 0);
2821 osc_wake_cache_waiters(cli);
2823 if (!list_empty(&oap->oap_urgent_item)) {
2824 list_del_init(&oap->oap_urgent_item);
2825 oap->oap_async_flags &= ~ASYNC_URGENT;
2827 if (!list_empty(&oap->oap_pending_item)) {
2828 list_del_init(&oap->oap_pending_item);
2829 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2831 loi_list_maint(cli, loi);
2833 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2835 client_obd_list_unlock(&cli->cl_loi_list_lock);
2839 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2842 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2845 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2848 lock_res_and_lock(lock);
2849 #if defined (__KERNEL__) && defined (__linux__)
2850 /* Liang XXX: Darwin and Winnt checking should be added */
2851 if (lock->l_ast_data && lock->l_ast_data != data) {
2852 struct inode *new_inode = data;
2853 struct inode *old_inode = lock->l_ast_data;
2854 if (!(old_inode->i_state & I_FREEING))
2855 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2856 LASSERTF(old_inode->i_state & I_FREEING,
2857 "Found existing inode %p/%lu/%u state %lu in lock: "
2858 "setting data to %p/%lu/%u\n", old_inode,
2859 old_inode->i_ino, old_inode->i_generation,
2861 new_inode, new_inode->i_ino, new_inode->i_generation);
2864 lock->l_ast_data = data;
2865 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2866 unlock_res_and_lock(lock);
2867 LDLM_LOCK_PUT(lock);
2870 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2871 ldlm_iterator_t replace, void *data)
2873 struct ldlm_res_id res_id = { .name = {0} };
2874 struct obd_device *obd = class_exp2obd(exp);
2876 res_id.name[0] = lsm->lsm_object_id;
2877 res_id.name[2] = lsm->lsm_object_gr;
2879 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2883 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2889 /* The request was created before ldlm_cli_enqueue call. */
2890 if (rc == ELDLM_LOCK_ABORTED) {
2891 struct ldlm_reply *rep;
2892 rep = req_capsule_server_get(&req->rq_pill,
2895 LASSERT(rep != NULL);
2896 if (rep->lock_policy_res1)
2897 rc = rep->lock_policy_res1;
2901 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2902 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2903 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2904 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2905 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2908 /* Call the update callback. */
2909 rc = oinfo->oi_cb_up(oinfo, rc);
2913 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2914 struct osc_enqueue_args *aa, int rc)
2916 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2917 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2918 struct ldlm_lock *lock;
2920 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2922 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2924 /* Complete obtaining the lock procedure. */
2925 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2927 &aa->oa_oi->oi_flags,
2928 &lsm->lsm_oinfo[0]->loi_lvb,
2929 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2930 lustre_swab_ost_lvb,
2931 aa->oa_oi->oi_lockh, rc);
2933 /* Complete osc stuff. */
2934 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2936 /* Release the lock for async request. */
2937 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2938 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2940 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2941 aa->oa_oi->oi_lockh, req, aa);
2942 LDLM_LOCK_PUT(lock);
2946 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2947 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2948 * other synchronous requests, however keeping some locks and trying to obtain
2949 * others may take a considerable amount of time in a case of ost failure; and
2950 * when other sync requests do not get released lock from a client, the client
2951 * is excluded from the cluster -- such scenarious make the life difficult, so
2952 * release locks just after they are obtained. */
2953 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2954 struct ldlm_enqueue_info *einfo,
2955 struct ptlrpc_request_set *rqset)
2957 struct ldlm_res_id res_id = { .name = {0} };
2958 struct obd_device *obd = exp->exp_obd;
2959 struct ptlrpc_request *req = NULL;
2960 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2965 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2966 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2968 /* Filesystem lock extents are extended to page boundaries so that
2969 * dealing with the page cache is a little smoother. */
2970 oinfo->oi_policy.l_extent.start -=
2971 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2972 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2974 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2977 /* Next, search for already existing extent locks that will cover us */
2978 /* If we're trying to read, we also search for an existing PW lock. The
2979 * VFS and page cache already protect us locally, so lots of readers/
2980 * writers can share a single PW lock.
2982 * There are problems with conversion deadlocks, so instead of
2983 * converting a read lock to a write lock, we'll just enqueue a new
2986 * At some point we should cancel the read lock instead of making them
2987 * send us a blocking callback, but there are problems with canceling
2988 * locks out from other users right now, too. */
2989 mode = einfo->ei_mode;
2990 if (einfo->ei_mode == LCK_PR)
2992 mode = ldlm_lock_match(obd->obd_namespace,
2993 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2994 einfo->ei_type, &oinfo->oi_policy, mode,
2997 /* addref the lock only if not async requests and PW lock is
2998 * matched whereas we asked for PR. */
2999 if (!rqset && einfo->ei_mode != mode)
3000 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3001 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3004 /* I would like to be able to ASSERT here that rss <=
3005 * kms, but I can't, for reasons which are explained in
3009 /* We already have a lock, and it's referenced */
3010 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3012 /* For async requests, decref the lock. */
3013 if (einfo->ei_mode != mode)
3014 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3016 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3023 CFS_LIST_HEAD(cancels);
3024 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3025 &RQF_LDLM_ENQUEUE_LVB);
3029 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3033 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3034 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3035 ptlrpc_request_set_replen(req);
3038 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3039 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3041 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3042 &oinfo->oi_policy, &oinfo->oi_flags,
3043 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3044 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3045 lustre_swab_ost_lvb, oinfo->oi_lockh,
3049 struct osc_enqueue_args *aa;
3050 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3051 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3056 req->rq_interpret_reply = osc_enqueue_interpret;
3057 ptlrpc_set_add_req(rqset, req);
3058 } else if (intent) {
3059 ptlrpc_req_finished(req);
3064 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3066 ptlrpc_req_finished(req);
3071 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3072 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3073 int *flags, void *data, struct lustre_handle *lockh)
3075 struct ldlm_res_id res_id = { .name = {0} };
3076 struct obd_device *obd = exp->exp_obd;
3077 int lflags = *flags;
3081 res_id.name[0] = lsm->lsm_object_id;
3082 res_id.name[2] = lsm->lsm_object_gr;
3084 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3087 /* Filesystem lock extents are extended to page boundaries so that
3088 * dealing with the page cache is a little smoother */
3089 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3090 policy->l_extent.end |= ~CFS_PAGE_MASK;
3092 /* Next, search for already existing extent locks that will cover us */
3093 /* If we're trying to read, we also search for an existing PW lock. The
3094 * VFS and page cache already protect us locally, so lots of readers/
3095 * writers can share a single PW lock. */
3099 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3100 &res_id, type, policy, rc, lockh);
3102 osc_set_data_with_check(lockh, data, lflags);
3103 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3104 ldlm_lock_addref(lockh, LCK_PR);
3105 ldlm_lock_decref(lockh, LCK_PW);
3112 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3113 __u32 mode, struct lustre_handle *lockh)
3117 if (unlikely(mode == LCK_GROUP))
3118 ldlm_lock_decref_and_cancel(lockh, mode);
3120 ldlm_lock_decref(lockh, mode);
3125 static int osc_cancel_unused(struct obd_export *exp,
3126 struct lov_stripe_md *lsm, int flags,
3129 struct obd_device *obd = class_exp2obd(exp);
3130 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3133 res_id.name[0] = lsm->lsm_object_id;
3134 res_id.name[2] = lsm->lsm_object_gr;
3138 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3141 static int osc_join_lru(struct obd_export *exp,
3142 struct lov_stripe_md *lsm, int join)
3144 struct obd_device *obd = class_exp2obd(exp);
3145 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3148 res_id.name[0] = lsm->lsm_object_id;
3149 res_id.name[2] = lsm->lsm_object_gr;
3153 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3156 static int osc_statfs_interpret(struct ptlrpc_request *req,
3157 struct osc_async_args *aa, int rc)
3159 struct obd_statfs *msfs;
3165 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3167 GOTO(out, rc = -EPROTO);
3170 *aa->aa_oi->oi_osfs = *msfs;
3172 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3176 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3177 __u64 max_age, struct ptlrpc_request_set *rqset)
3179 struct ptlrpc_request *req;
3180 struct osc_async_args *aa;
3184 /* We could possibly pass max_age in the request (as an absolute
3185 * timestamp or a "seconds.usec ago") so the target can avoid doing
3186 * extra calls into the filesystem if that isn't necessary (e.g.
3187 * during mount that would help a bit). Having relative timestamps
3188 * is not so great if request processing is slow, while absolute
3189 * timestamps are not ideal because they need time synchronization. */
3190 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3194 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3196 ptlrpc_request_free(req);
3199 ptlrpc_request_set_replen(req);
3200 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3202 req->rq_interpret_reply = osc_statfs_interpret;
3203 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3204 aa = (struct osc_async_args *)&req->rq_async_args;
3207 ptlrpc_set_add_req(rqset, req);
3211 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3214 struct obd_statfs *msfs;
3215 struct ptlrpc_request *req;
3219 /* We could possibly pass max_age in the request (as an absolute
3220 * timestamp or a "seconds.usec ago") so the target can avoid doing
3221 * extra calls into the filesystem if that isn't necessary (e.g.
3222 * during mount that would help a bit). Having relative timestamps
3223 * is not so great if request processing is slow, while absolute
3224 * timestamps are not ideal because they need time synchronization. */
3225 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3229 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3231 ptlrpc_request_free(req);
3234 ptlrpc_request_set_replen(req);
3235 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3237 rc = ptlrpc_queue_wait(req);
3241 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3243 GOTO(out, rc = -EPROTO);
3250 ptlrpc_req_finished(req);
3254 /* Retrieve object striping information.
3256 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3257 * the maximum number of OST indices which will fit in the user buffer.
3258 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3260 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3262 struct lov_user_md lum, *lumk;
3263 int rc = 0, lum_size;
3269 if (copy_from_user(&lum, lump, sizeof(lum)))
3272 if (lum.lmm_magic != LOV_USER_MAGIC)
3275 if (lum.lmm_stripe_count > 0) {
3276 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3277 OBD_ALLOC(lumk, lum_size);
3281 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3282 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3284 lum_size = sizeof(lum);
3288 lumk->lmm_object_id = lsm->lsm_object_id;
3289 lumk->lmm_object_gr = lsm->lsm_object_gr;
3290 lumk->lmm_stripe_count = 1;
3292 if (copy_to_user(lump, lumk, lum_size))
3296 OBD_FREE(lumk, lum_size);
3302 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3303 void *karg, void *uarg)
3305 struct obd_device *obd = exp->exp_obd;
3306 struct obd_ioctl_data *data = karg;
3310 if (!try_module_get(THIS_MODULE)) {
3311 CERROR("Can't get module. Is it alive?");
3315 case OBD_IOC_LOV_GET_CONFIG: {
3317 struct lov_desc *desc;
3318 struct obd_uuid uuid;
3322 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3323 GOTO(out, err = -EINVAL);
3325 data = (struct obd_ioctl_data *)buf;
3327 if (sizeof(*desc) > data->ioc_inllen1) {
3328 obd_ioctl_freedata(buf, len);
3329 GOTO(out, err = -EINVAL);
3332 if (data->ioc_inllen2 < sizeof(uuid)) {
3333 obd_ioctl_freedata(buf, len);
3334 GOTO(out, err = -EINVAL);
3337 desc = (struct lov_desc *)data->ioc_inlbuf1;
3338 desc->ld_tgt_count = 1;
3339 desc->ld_active_tgt_count = 1;
3340 desc->ld_default_stripe_count = 1;
3341 desc->ld_default_stripe_size = 0;
3342 desc->ld_default_stripe_offset = 0;
3343 desc->ld_pattern = 0;
3344 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3346 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3348 err = copy_to_user((void *)uarg, buf, len);
3351 obd_ioctl_freedata(buf, len);
3354 case LL_IOC_LOV_SETSTRIPE:
3355 err = obd_alloc_memmd(exp, karg);
3359 case LL_IOC_LOV_GETSTRIPE:
3360 err = osc_getstripe(karg, uarg);
3362 case OBD_IOC_CLIENT_RECOVER:
3363 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3368 case IOC_OSC_SET_ACTIVE:
3369 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3372 case OBD_IOC_POLL_QUOTACHECK:
3373 err = lquota_poll_check(quota_interface, exp,
3374 (struct if_quotacheck *)karg);
3377 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3378 cmd, cfs_curproc_comm());
3379 GOTO(out, err = -ENOTTY);
3382 module_put(THIS_MODULE);
3386 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3387 void *key, __u32 *vallen, void *val)
3390 if (!vallen || !val)
3393 if (KEY_IS("lock_to_stripe")) {
3394 __u32 *stripe = val;
3395 *vallen = sizeof(*stripe);
3398 } else if (KEY_IS("last_id")) {
3399 struct ptlrpc_request *req;
3404 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3409 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3410 RCL_CLIENT, keylen);
3411 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3413 ptlrpc_request_free(req);
3417 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3419 memcpy(tmp, key, keylen);
3421 req_capsule_set_size(&req->rq_pill, &RMF_OBD_ID,
3422 RCL_SERVER, *vallen);
3423 ptlrpc_request_set_replen(req);
3424 rc = ptlrpc_queue_wait(req);
3428 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3430 GOTO(out, rc = -EPROTO);
3432 *((obd_id *)val) = *reply;
3434 ptlrpc_req_finished(req);
3440 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3443 struct llog_ctxt *ctxt;
3444 struct obd_import *imp = req->rq_import;
3450 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3453 rc = llog_initiator_connect(ctxt);
3455 CERROR("cannot establish connection for "
3456 "ctxt %p: %d\n", ctxt, rc);
3459 spin_lock(&imp->imp_lock);
3460 imp->imp_server_timeout = 1;
3461 imp->imp_pingable = 1;
3462 spin_unlock(&imp->imp_lock);
3463 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3468 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3469 void *key, obd_count vallen, void *val,
3470 struct ptlrpc_request_set *set)
3472 struct ptlrpc_request *req;
3473 struct obd_device *obd = exp->exp_obd;
3474 struct obd_import *imp = class_exp2cliimp(exp);
3479 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3481 if (KEY_IS(KEY_NEXT_ID)) {
3482 if (vallen != sizeof(obd_id))
3484 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3485 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3486 exp->exp_obd->obd_name,
3487 obd->u.cli.cl_oscc.oscc_next_id);
3492 if (KEY_IS("unlinked")) {
3493 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3494 spin_lock(&oscc->oscc_lock);
3495 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3496 spin_unlock(&oscc->oscc_lock);
3500 if (KEY_IS(KEY_INIT_RECOV)) {
3501 if (vallen != sizeof(int))
3503 spin_lock(&imp->imp_lock);
3504 imp->imp_initial_recov = *(int *)val;
3505 spin_unlock(&imp->imp_lock);
3506 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3507 exp->exp_obd->obd_name,
3508 imp->imp_initial_recov);
3512 if (KEY_IS("checksum")) {
3513 if (vallen != sizeof(int))
3515 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3519 if (KEY_IS(KEY_FLUSH_CTX)) {
3520 sptlrpc_import_flush_my_ctx(imp);
3527 /* We pass all other commands directly to OST. Since nobody calls osc
3528 methods directly and everybody is supposed to go through LOV, we
3529 assume lov checked invalid values for us.
3530 The only recognised values so far are evict_by_nid and mds_conn.
3531 Even if something bad goes through, we'd get a -EINVAL from OST
3535 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3539 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3540 RCL_CLIENT, keylen);
3541 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3542 RCL_CLIENT, vallen);
3543 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3545 ptlrpc_request_free(req);
3549 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3550 memcpy(tmp, key, keylen);
3551 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3552 memcpy(tmp, val, vallen);
3554 if (KEY_IS(KEY_MDS_CONN)) {
3555 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3557 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3558 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3559 LASSERT(oscc->oscc_oa.o_gr > 0);
3560 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3563 ptlrpc_request_set_replen(req);
3564 ptlrpc_set_add_req(set, req);
3565 ptlrpc_check_set(set);
3571 static struct llog_operations osc_size_repl_logops = {
3572 lop_cancel: llog_obd_repl_cancel
3575 static struct llog_operations osc_mds_ost_orig_logops;
3576 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3577 struct obd_device *tgt, int count,
3578 struct llog_catid *catid, struct obd_uuid *uuid)
3583 spin_lock(&obd->obd_dev_lock);
3584 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3585 osc_mds_ost_orig_logops = llog_lvfs_ops;
3586 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3587 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3588 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3589 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3591 spin_unlock(&obd->obd_dev_lock);
3593 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3594 &catid->lci_logid, &osc_mds_ost_orig_logops);
3596 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3600 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3601 &osc_size_repl_logops);
3603 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3606 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3607 obd->obd_name, tgt->obd_name, count, catid, rc);
3608 CERROR("logid "LPX64":0x%x\n",
3609 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3614 static int osc_llog_finish(struct obd_device *obd, int count)
3616 struct llog_ctxt *ctxt;
3617 int rc = 0, rc2 = 0;
3620 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3622 rc = llog_cleanup(ctxt);
3624 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3626 rc2 = llog_cleanup(ctxt);
3633 static int osc_reconnect(const struct lu_env *env,
3634 struct obd_export *exp, struct obd_device *obd,
3635 struct obd_uuid *cluuid,
3636 struct obd_connect_data *data)
3638 struct client_obd *cli = &obd->u.cli;
3640 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3643 client_obd_list_lock(&cli->cl_loi_list_lock);
3644 data->ocd_grant = cli->cl_avail_grant ?:
3645 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3646 lost_grant = cli->cl_lost_grant;
3647 cli->cl_lost_grant = 0;
3648 client_obd_list_unlock(&cli->cl_loi_list_lock);
3650 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3651 "cl_lost_grant: %ld\n", data->ocd_grant,
3652 cli->cl_avail_grant, lost_grant);
3653 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3654 " ocd_grant: %d\n", data->ocd_connect_flags,
3655 data->ocd_version, data->ocd_grant);
3661 static int osc_disconnect(struct obd_export *exp)
3663 struct obd_device *obd = class_exp2obd(exp);
3664 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3667 if (obd->u.cli.cl_conn_count == 1)
3668 /* flush any remaining cancel messages out to the target */
3669 llog_sync(ctxt, exp);
3671 rc = client_disconnect_export(exp);
3675 static int osc_import_event(struct obd_device *obd,
3676 struct obd_import *imp,
3677 enum obd_import_event event)
3679 struct client_obd *cli;
3683 LASSERT(imp->imp_obd == obd);
3686 case IMP_EVENT_DISCON: {
3687 /* Only do this on the MDS OSC's */
3688 if (imp->imp_server_timeout) {
3689 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3691 spin_lock(&oscc->oscc_lock);
3692 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3693 spin_unlock(&oscc->oscc_lock);
3696 client_obd_list_lock(&cli->cl_loi_list_lock);
3697 cli->cl_avail_grant = 0;
3698 cli->cl_lost_grant = 0;
3699 client_obd_list_unlock(&cli->cl_loi_list_lock);
3702 case IMP_EVENT_INACTIVE: {
3703 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3706 case IMP_EVENT_INVALIDATE: {
3707 struct ldlm_namespace *ns = obd->obd_namespace;
3711 client_obd_list_lock(&cli->cl_loi_list_lock);
3712 /* all pages go to failing rpcs due to the invalid import */
3713 osc_check_rpcs(cli);
3714 client_obd_list_unlock(&cli->cl_loi_list_lock);
3716 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3720 case IMP_EVENT_ACTIVE: {
3721 /* Only do this on the MDS OSC's */
3722 if (imp->imp_server_timeout) {
3723 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3725 spin_lock(&oscc->oscc_lock);
3726 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3727 spin_unlock(&oscc->oscc_lock);
3729 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3732 case IMP_EVENT_OCD: {
3733 struct obd_connect_data *ocd = &imp->imp_connect_data;
3735 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3736 osc_init_grant(&obd->u.cli, ocd);
3739 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3740 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3742 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3746 CERROR("Unknown import event %d\n", event);
3752 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3758 rc = ptlrpcd_addref();
3762 rc = client_obd_setup(obd, lcfg);
3766 struct lprocfs_static_vars lvars = { 0 };
3767 struct client_obd *cli = &obd->u.cli;
3769 lprocfs_osc_init_vars(&lvars);
3770 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3771 lproc_osc_attach_seqstat(obd);
3772 sptlrpc_lprocfs_cliobd_attach(obd);
3773 ptlrpc_lprocfs_register_obd(obd);
3777 /* We need to allocate a few requests more, because
3778 brw_interpret_oap tries to create new requests before freeing
3779 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3780 reserved, but I afraid that might be too much wasted RAM
3781 in fact, so 2 is just my guess and still should work. */
3782 cli->cl_import->imp_rq_pool =
3783 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3785 ptlrpc_add_rqs_to_pool);
3791 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3797 case OBD_CLEANUP_EARLY: {
3798 struct obd_import *imp;
3799 imp = obd->u.cli.cl_import;
3800 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3801 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3802 ptlrpc_deactivate_import(imp);
3803 spin_lock(&imp->imp_lock);
3804 imp->imp_pingable = 0;
3805 spin_unlock(&imp->imp_lock);
3808 case OBD_CLEANUP_EXPORTS: {
3809 /* If we set up but never connected, the
3810 client import will not have been cleaned. */
3811 if (obd->u.cli.cl_import) {
3812 struct obd_import *imp;
3813 imp = obd->u.cli.cl_import;
3814 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3816 ptlrpc_invalidate_import(imp);
3817 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3818 class_destroy_import(imp);
3819 obd->u.cli.cl_import = NULL;
3823 case OBD_CLEANUP_SELF_EXP:
3824 rc = obd_llog_finish(obd, 0);
3826 CERROR("failed to cleanup llogging subsystems\n");
3828 case OBD_CLEANUP_OBD:
3834 int osc_cleanup(struct obd_device *obd)
3836 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3840 ptlrpc_lprocfs_unregister_obd(obd);
3841 lprocfs_obd_cleanup(obd);
3843 spin_lock(&oscc->oscc_lock);
3844 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3845 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3846 spin_unlock(&oscc->oscc_lock);
3848 /* free memory of osc quota cache */
3849 lquota_cleanup(quota_interface, obd);
3851 rc = client_obd_cleanup(obd);
3857 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3859 struct lustre_cfg *lcfg = buf;
3860 struct lprocfs_static_vars lvars = { 0 };
3863 lprocfs_osc_init_vars(&lvars);
3865 switch (lcfg->lcfg_command) {
3866 case LCFG_SPTLRPC_CONF:
3867 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3870 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3878 struct obd_ops osc_obd_ops = {
3879 .o_owner = THIS_MODULE,
3880 .o_setup = osc_setup,
3881 .o_precleanup = osc_precleanup,
3882 .o_cleanup = osc_cleanup,
3883 .o_add_conn = client_import_add_conn,
3884 .o_del_conn = client_import_del_conn,
3885 .o_connect = client_connect_import,
3886 .o_reconnect = osc_reconnect,
3887 .o_disconnect = osc_disconnect,
3888 .o_statfs = osc_statfs,
3889 .o_statfs_async = osc_statfs_async,
3890 .o_packmd = osc_packmd,
3891 .o_unpackmd = osc_unpackmd,
3892 .o_precreate = osc_precreate,
3893 .o_create = osc_create,
3894 .o_destroy = osc_destroy,
3895 .o_getattr = osc_getattr,
3896 .o_getattr_async = osc_getattr_async,
3897 .o_setattr = osc_setattr,
3898 .o_setattr_async = osc_setattr_async,
3900 .o_brw_async = osc_brw_async,
3901 .o_prep_async_page = osc_prep_async_page,
3902 .o_queue_async_io = osc_queue_async_io,
3903 .o_set_async_flags = osc_set_async_flags,
3904 .o_queue_group_io = osc_queue_group_io,
3905 .o_trigger_group_io = osc_trigger_group_io,
3906 .o_teardown_async_page = osc_teardown_async_page,
3907 .o_punch = osc_punch,
3909 .o_enqueue = osc_enqueue,
3910 .o_match = osc_match,
3911 .o_change_cbdata = osc_change_cbdata,
3912 .o_cancel = osc_cancel,
3913 .o_cancel_unused = osc_cancel_unused,
3914 .o_join_lru = osc_join_lru,
3915 .o_iocontrol = osc_iocontrol,
3916 .o_get_info = osc_get_info,
3917 .o_set_info_async = osc_set_info_async,
3918 .o_import_event = osc_import_event,
3919 .o_llog_init = osc_llog_init,
3920 .o_llog_finish = osc_llog_finish,
3921 .o_process_config = osc_process_config,
3923 int __init osc_init(void)
3925 struct lprocfs_static_vars lvars = { 0 };
3929 lprocfs_osc_init_vars(&lvars);
3931 request_module("lquota");
3932 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3933 lquota_init(quota_interface);
3934 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3936 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3937 LUSTRE_OSC_NAME, NULL);
3939 if (quota_interface)
3940 PORTAL_SYMBOL_PUT(osc_quota_interface);
3948 static void /*__exit*/ osc_exit(void)
3950 lquota_exit(quota_interface);
3951 if (quota_interface)
3952 PORTAL_SYMBOL_PUT(osc_quota_interface);
3954 class_unregister_type(LUSTRE_OSC_NAME);
3957 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3958 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3959 MODULE_LICENSE("GPL");
3961 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);