1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
144 /* XXX zero *lsmp? */
145 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147 LASSERT((*lsmp)->lsm_object_id);
148 LASSERT((*lsmp)->lsm_object_gr);
151 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156 static inline void osc_pack_capa(struct ptlrpc_request *req,
157 struct ost_body *body, void *capa)
159 struct obd_capa *oc = (struct obd_capa *)capa;
160 struct lustre_capa *c;
165 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
168 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169 DEBUG_CAPA(D_SEC, c, "pack");
172 static inline void osc_pack_req_body(struct ptlrpc_request *req,
173 struct obd_info *oinfo)
175 struct ost_body *body;
177 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
180 body->oa = *oinfo->oi_oa;
181 osc_pack_capa(req, body, oinfo->oi_capa);
184 static inline void osc_set_capa_size(struct ptlrpc_request *req,
185 const struct req_msg_field *field,
189 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
191 /* it is already calculated as sizeof struct obd_capa */
195 static int osc_getattr_interpret(struct ptlrpc_request *req,
196 struct osc_async_args *aa, int rc)
198 struct ost_body *body;
204 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
205 lustre_swab_ost_body);
207 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
208 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
210 /* This should really be sent by the OST */
211 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
212 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
214 CDEBUG(D_INFO, "can't unpack ost_body\n");
216 aa->aa_oi->oi_oa->o_valid = 0;
219 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
224 struct ptlrpc_request_set *set)
226 struct ptlrpc_request *req;
227 struct osc_async_args *aa;
231 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
235 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
236 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
238 ptlrpc_request_free(req);
242 osc_pack_req_body(req, oinfo);
244 ptlrpc_request_set_replen(req);
245 req->rq_interpret_reply = osc_getattr_interpret;
247 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
248 aa = (struct osc_async_args *)&req->rq_async_args;
251 ptlrpc_set_add_req(set, req);
255 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
257 struct ptlrpc_request *req;
258 struct ost_body *body;
262 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
266 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
267 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
269 ptlrpc_request_free(req);
273 osc_pack_req_body(req, oinfo);
275 ptlrpc_request_set_replen(req);
277 rc = ptlrpc_queue_wait(req);
281 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
283 GOTO(out, rc = -EPROTO);
285 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
286 *oinfo->oi_oa = body->oa;
288 /* This should really be sent by the OST */
289 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
290 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
294 ptlrpc_req_finished(req);
298 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
299 struct obd_trans_info *oti)
301 struct ptlrpc_request *req;
302 struct ost_body *body;
306 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
307 oinfo->oi_oa->o_gr > 0);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
316 ptlrpc_request_free(req);
320 osc_pack_req_body(req, oinfo);
322 ptlrpc_request_set_replen(req);
325 rc = ptlrpc_queue_wait(req);
329 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
331 GOTO(out, rc = -EPROTO);
333 *oinfo->oi_oa = body->oa;
337 ptlrpc_req_finished(req);
341 static int osc_setattr_interpret(struct ptlrpc_request *req,
342 struct osc_async_args *aa, int rc)
344 struct ost_body *body;
350 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
352 GOTO(out, rc = -EPROTO);
354 *aa->aa_oi->oi_oa = body->oa;
356 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
360 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
361 struct obd_trans_info *oti,
362 struct ptlrpc_request_set *rqset)
364 struct ptlrpc_request *req;
365 struct osc_async_args *aa;
369 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
373 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
374 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
376 ptlrpc_request_free(req);
380 osc_pack_req_body(req, oinfo);
382 ptlrpc_request_set_replen(req);
384 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
386 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
389 /* do mds to ost setattr asynchronouly */
391 /* Do not wait for response. */
392 ptlrpcd_add_req(req);
394 req->rq_interpret_reply = osc_setattr_interpret;
396 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
397 aa = (struct osc_async_args *)&req->rq_async_args;
400 ptlrpc_set_add_req(rqset, req);
406 int osc_real_create(struct obd_export *exp, struct obdo *oa,
407 struct lov_stripe_md **ea, struct obd_trans_info *oti)
409 struct ptlrpc_request *req;
410 struct ost_body *body;
411 struct lov_stripe_md *lsm;
420 rc = obd_alloc_memmd(exp, &lsm);
425 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
427 GOTO(out, rc = -ENOMEM);
429 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
431 ptlrpc_request_free(req);
435 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
439 ptlrpc_request_set_replen(req);
441 if (oa->o_valid & OBD_MD_FLINLINE) {
442 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
443 oa->o_flags == OBD_FL_DELORPHAN);
445 "delorphan from OST integration");
446 /* Don't resend the delorphan req */
447 req->rq_no_resend = req->rq_no_delay = 1;
450 rc = ptlrpc_queue_wait(req);
454 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
456 GOTO(out_req, rc = -EPROTO);
460 /* This should really be sent by the OST */
461 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
462 oa->o_valid |= OBD_MD_FLBLKSZ;
464 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
465 * have valid lsm_oinfo data structs, so don't go touching that.
466 * This needs to be fixed in a big way.
468 lsm->lsm_object_id = oa->o_id;
469 lsm->lsm_object_gr = oa->o_gr;
473 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
475 if (oa->o_valid & OBD_MD_FLCOOKIE) {
476 if (!oti->oti_logcookies)
477 oti_alloc_cookies(oti, 1);
478 *oti->oti_logcookies = *obdo_logcookie(oa);
482 CDEBUG(D_HA, "transno: "LPD64"\n",
483 lustre_msg_get_transno(req->rq_repmsg));
485 ptlrpc_req_finished(req);
488 obd_free_memmd(exp, &lsm);
492 static int osc_punch_interpret(struct ptlrpc_request *req,
493 struct osc_async_args *aa, int rc)
495 struct ost_body *body;
501 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
503 GOTO(out, rc = -EPROTO);
505 *aa->aa_oi->oi_oa = body->oa;
507 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
511 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
512 struct obd_trans_info *oti,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_async_args *aa;
517 struct ost_body *body;
522 CDEBUG(D_INFO, "oa NULL\n");
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
533 ptlrpc_request_free(req);
536 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537 osc_pack_req_body(req, oinfo);
539 /* overload the size and blocks fields in the oa with start/end */
540 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542 body->oa.o_size = oinfo->oi_policy.l_extent.start;
543 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
544 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
545 ptlrpc_request_set_replen(req);
548 req->rq_interpret_reply = osc_punch_interpret;
549 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
550 aa = (struct osc_async_args *)&req->rq_async_args;
552 ptlrpc_set_add_req(rqset, req);
557 static int osc_sync(struct obd_export *exp, struct obdo *oa,
558 struct lov_stripe_md *md, obd_size start, obd_size end,
561 struct ptlrpc_request *req;
562 struct ost_body *body;
567 CDEBUG(D_INFO, "oa NULL\n");
571 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
575 osc_set_capa_size(req, &RMF_CAPA1, capa);
576 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
578 ptlrpc_request_free(req);
582 /* overload the size and blocks fields in the oa with start/end */
583 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
586 body->oa.o_size = start;
587 body->oa.o_blocks = end;
588 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
589 osc_pack_capa(req, body, capa);
591 ptlrpc_request_set_replen(req);
593 rc = ptlrpc_queue_wait(req);
597 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
599 GOTO(out, rc = -EPROTO);
605 ptlrpc_req_finished(req);
609 /* Find and cancel locally locks matched by @mode in the resource found by
610 * @objid. Found locks are added into @cancel list. Returns the amount of
611 * locks added to @cancels list. */
612 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
613 struct list_head *cancels, ldlm_mode_t mode,
616 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
617 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
618 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
625 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
626 lock_flags, 0, NULL);
627 ldlm_resource_putref(res);
631 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
634 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
636 atomic_dec(&cli->cl_destroy_in_flight);
637 cfs_waitq_signal(&cli->cl_destroy_waitq);
641 static int osc_can_send_destroy(struct client_obd *cli)
643 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
644 cli->cl_max_rpcs_in_flight) {
645 /* The destroy request can be sent */
648 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
649 cli->cl_max_rpcs_in_flight) {
651 * The counter has been modified between the two atomic
654 cfs_waitq_signal(&cli->cl_destroy_waitq);
659 /* Destroy requests can be async always on the client, and we don't even really
660 * care about the return code since the client cannot do anything at all about
662 * When the MDS is unlinking a filename, it saves the file objects into a
663 * recovery llog, and these object records are cancelled when the OST reports
664 * they were destroyed and sync'd to disk (i.e. transaction committed).
665 * If the client dies, or the OST is down when the object should be destroyed,
666 * the records are not cancelled, and when the OST reconnects to the MDS next,
667 * it will retrieve the llog unlink logs and then sends the log cancellation
668 * cookies to the MDS after committing destroy transactions. */
669 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
670 struct lov_stripe_md *ea, struct obd_trans_info *oti,
671 struct obd_export *md_export)
673 struct client_obd *cli = &exp->exp_obd->u.cli;
674 struct ptlrpc_request *req;
675 struct ost_body *body;
676 CFS_LIST_HEAD(cancels);
681 CDEBUG(D_INFO, "oa NULL\n");
685 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
686 LDLM_FL_DISCARD_DATA);
688 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
690 ldlm_lock_list_put(&cancels, l_bl_ast, count);
694 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
697 ptlrpc_request_free(req);
701 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
702 req->rq_interpret_reply = osc_destroy_interpret;
704 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
705 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
706 sizeof(*oti->oti_logcookies));
707 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
711 ptlrpc_request_set_replen(req);
713 if (!osc_can_send_destroy(cli)) {
714 struct l_wait_info lwi = { 0 };
717 * Wait until the number of on-going destroy RPCs drops
718 * under max_rpc_in_flight
720 l_wait_event_exclusive(cli->cl_destroy_waitq,
721 osc_can_send_destroy(cli), &lwi);
724 /* Do not wait for response */
725 ptlrpcd_add_req(req);
729 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
732 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
734 LASSERT(!(oa->o_valid & bits));
737 client_obd_list_lock(&cli->cl_loi_list_lock);
738 oa->o_dirty = cli->cl_dirty;
739 if (cli->cl_dirty > cli->cl_dirty_max) {
740 CERROR("dirty %lu > dirty_max %lu\n",
741 cli->cl_dirty, cli->cl_dirty_max);
743 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
744 CERROR("dirty %d > system dirty_max %d\n",
745 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
747 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
748 CERROR("dirty %lu - dirty_max %lu too big???\n",
749 cli->cl_dirty, cli->cl_dirty_max);
752 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
753 (cli->cl_max_rpcs_in_flight + 1);
754 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
756 oa->o_grant = cli->cl_avail_grant;
757 oa->o_dropped = cli->cl_lost_grant;
758 cli->cl_lost_grant = 0;
759 client_obd_list_unlock(&cli->cl_loi_list_lock);
760 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
761 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
764 /* caller must hold loi_list_lock */
765 static void osc_consume_write_grant(struct client_obd *cli,
766 struct brw_page *pga)
768 atomic_inc(&obd_dirty_pages);
769 cli->cl_dirty += CFS_PAGE_SIZE;
770 cli->cl_avail_grant -= CFS_PAGE_SIZE;
771 pga->flag |= OBD_BRW_FROM_GRANT;
772 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
773 CFS_PAGE_SIZE, pga, pga->pg);
774 LASSERT(cli->cl_avail_grant >= 0);
777 /* the companion to osc_consume_write_grant, called when a brw has completed.
778 * must be called with the loi lock held. */
779 static void osc_release_write_grant(struct client_obd *cli,
780 struct brw_page *pga, int sent)
782 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
785 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
790 pga->flag &= ~OBD_BRW_FROM_GRANT;
791 atomic_dec(&obd_dirty_pages);
792 cli->cl_dirty -= CFS_PAGE_SIZE;
794 cli->cl_lost_grant += CFS_PAGE_SIZE;
795 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
796 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
797 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
798 /* For short writes we shouldn't count parts of pages that
799 * span a whole block on the OST side, or our accounting goes
800 * wrong. Should match the code in filter_grant_check. */
801 int offset = pga->off & ~CFS_PAGE_MASK;
802 int count = pga->count + (offset & (blocksize - 1));
803 int end = (offset + pga->count) & (blocksize - 1);
805 count += blocksize - end;
807 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
808 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
809 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
810 cli->cl_avail_grant, cli->cl_dirty);
816 static unsigned long rpcs_in_flight(struct client_obd *cli)
818 return cli->cl_r_in_flight + cli->cl_w_in_flight;
821 /* caller must hold loi_list_lock */
822 void osc_wake_cache_waiters(struct client_obd *cli)
824 struct list_head *l, *tmp;
825 struct osc_cache_waiter *ocw;
828 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
829 /* if we can't dirty more, we must wait until some is written */
830 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
831 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
832 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
833 "osc max %ld, sys max %d\n", cli->cl_dirty,
834 cli->cl_dirty_max, obd_max_dirty_pages);
838 /* if still dirty cache but no grant wait for pending RPCs that
839 * may yet return us some grant before doing sync writes */
840 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
841 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
842 cli->cl_w_in_flight);
846 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
847 list_del_init(&ocw->ocw_entry);
848 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
849 /* no more RPCs in flight to return grant, do sync IO */
850 ocw->ocw_rc = -EDQUOT;
851 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
853 osc_consume_write_grant(cli,
854 &ocw->ocw_oap->oap_brw_page);
857 cfs_waitq_signal(&ocw->ocw_waitq);
863 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
865 client_obd_list_lock(&cli->cl_loi_list_lock);
866 cli->cl_avail_grant = ocd->ocd_grant;
867 client_obd_list_unlock(&cli->cl_loi_list_lock);
869 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
870 cli->cl_avail_grant, cli->cl_lost_grant);
871 LASSERT(cli->cl_avail_grant >= 0);
874 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
876 client_obd_list_lock(&cli->cl_loi_list_lock);
877 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
878 if (body->oa.o_valid & OBD_MD_FLGRANT)
879 cli->cl_avail_grant += body->oa.o_grant;
880 /* waiters are woken in brw_interpret_oap */
881 client_obd_list_unlock(&cli->cl_loi_list_lock);
884 /* We assume that the reason this OSC got a short read is because it read
885 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
886 * via the LOV, and it _knows_ it's reading inside the file, it's just that
887 * this stripe never got written at or beyond this stripe offset yet. */
888 static void handle_short_read(int nob_read, obd_count page_count,
889 struct brw_page **pga)
894 /* skip bytes read OK */
895 while (nob_read > 0) {
896 LASSERT (page_count > 0);
898 if (pga[i]->count > nob_read) {
899 /* EOF inside this page */
900 ptr = cfs_kmap(pga[i]->pg) +
901 (pga[i]->off & ~CFS_PAGE_MASK);
902 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
903 cfs_kunmap(pga[i]->pg);
909 nob_read -= pga[i]->count;
914 /* zero remaining pages */
915 while (page_count-- > 0) {
916 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
917 memset(ptr, 0, pga[i]->count);
918 cfs_kunmap(pga[i]->pg);
923 static int check_write_rcs(struct ptlrpc_request *req,
924 int requested_nob, int niocount,
925 obd_count page_count, struct brw_page **pga)
929 /* return error if any niobuf was in error */
930 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
931 sizeof(*remote_rcs) * niocount, NULL);
932 if (remote_rcs == NULL) {
933 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
936 if (lustre_msg_swabbed(req->rq_repmsg))
937 for (i = 0; i < niocount; i++)
938 __swab32s(&remote_rcs[i]);
940 for (i = 0; i < niocount; i++) {
941 if (remote_rcs[i] < 0)
942 return(remote_rcs[i]);
944 if (remote_rcs[i] != 0) {
945 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
946 i, remote_rcs[i], req);
951 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
952 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
953 requested_nob, req->rq_bulk->bd_nob_transferred);
960 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
962 if (p1->flag != p2->flag) {
963 unsigned mask = ~OBD_BRW_FROM_GRANT;
965 /* warn if we try to combine flags that we don't know to be
967 if ((p1->flag & mask) != (p2->flag & mask))
968 CERROR("is it ok to have flags 0x%x and 0x%x in the "
969 "same brw?\n", p1->flag, p2->flag);
973 return (p1->off + p1->count == p2->off);
976 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
977 struct brw_page **pga, int opc,
978 cksum_type_t cksum_type)
983 LASSERT (pg_count > 0);
984 cksum = init_checksum(cksum_type);
985 while (nob > 0 && pg_count > 0) {
986 unsigned char *ptr = cfs_kmap(pga[i]->pg);
987 int off = pga[i]->off & ~CFS_PAGE_MASK;
988 int count = pga[i]->count > nob ? nob : pga[i]->count;
990 /* corrupt the data before we compute the checksum, to
991 * simulate an OST->client data error */
992 if (i == 0 && opc == OST_READ &&
993 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
994 memcpy(ptr + off, "bad1", min(4, nob));
995 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
996 cfs_kunmap(pga[i]->pg);
997 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1000 nob -= pga[i]->count;
1004 /* For sending we only compute the wrong checksum instead
1005 * of corrupting the data so it is still correct on a redo */
1006 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1012 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1013 struct lov_stripe_md *lsm, obd_count page_count,
1014 struct brw_page **pga,
1015 struct ptlrpc_request **reqp,
1016 struct obd_capa *ocapa)
1018 struct ptlrpc_request *req;
1019 struct ptlrpc_bulk_desc *desc;
1020 struct ost_body *body;
1021 struct obd_ioobj *ioobj;
1022 struct niobuf_remote *niobuf;
1023 int niocount, i, requested_nob, opc, rc;
1024 struct osc_brw_async_args *aa;
1025 struct req_capsule *pill;
1028 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1029 RETURN(-ENOMEM); /* Recoverable */
1030 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1031 RETURN(-EINVAL); /* Fatal */
1033 if ((cmd & OBD_BRW_WRITE) != 0) {
1035 req = ptlrpc_request_alloc_pool(cli->cl_import,
1036 cli->cl_import->imp_rq_pool,
1040 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1046 for (niocount = i = 1; i < page_count; i++) {
1047 if (!can_merge_pages(pga[i - 1], pga[i]))
1051 pill = &req->rq_pill;
1052 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1053 niocount * sizeof(*niobuf));
1054 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1056 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1058 ptlrpc_request_free(req);
1061 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1063 if (opc == OST_WRITE)
1064 desc = ptlrpc_prep_bulk_imp(req, page_count,
1065 BULK_GET_SOURCE, OST_BULK_PORTAL);
1067 desc = ptlrpc_prep_bulk_imp(req, page_count,
1068 BULK_PUT_SINK, OST_BULK_PORTAL);
1071 GOTO(out, rc = -ENOMEM);
1072 /* NB request now owns desc and will free it when it gets freed */
1074 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1075 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1076 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1077 LASSERT(body && ioobj && niobuf);
1081 obdo_to_ioobj(oa, ioobj);
1082 ioobj->ioo_bufcnt = niocount;
1083 osc_pack_capa(req, body, ocapa);
1084 LASSERT (page_count > 0);
1085 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1086 struct brw_page *pg = pga[i];
1087 struct brw_page *pg_prev = pga[i - 1];
1089 LASSERT(pg->count > 0);
1090 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1091 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1092 pg->off, pg->count);
1094 LASSERTF(i == 0 || pg->off > pg_prev->off,
1095 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1096 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1098 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1099 pg_prev->pg, page_private(pg_prev->pg),
1100 pg_prev->pg->index, pg_prev->off);
1102 LASSERTF(i == 0 || pg->off > pg_prev->off,
1103 "i %d p_c %u\n", i, page_count);
1105 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1106 (pg->flag & OBD_BRW_SRVLOCK));
1108 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1110 requested_nob += pg->count;
1112 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1114 niobuf->len += pg->count;
1116 niobuf->offset = pg->off;
1117 niobuf->len = pg->count;
1118 niobuf->flags = pg->flag;
1122 LASSERT((void *)(niobuf - niocount) ==
1123 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1124 niocount * sizeof(*niobuf)));
1125 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1127 /* size[REQ_REC_OFF] still sizeof (*body) */
1128 if (opc == OST_WRITE) {
1129 if (unlikely(cli->cl_checksum) &&
1130 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1131 /* store cl_cksum_type in a local variable since
1132 * it can be changed via lprocfs */
1133 cksum_type_t cksum_type = cli->cl_cksum_type;
1135 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1136 oa->o_flags = body->oa.o_flags = 0;
1137 body->oa.o_flags |= cksum_type_pack(cksum_type);
1138 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1139 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1143 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1145 /* save this in 'oa', too, for later checking */
1146 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1147 oa->o_flags |= cksum_type_pack(cksum_type);
1149 /* clear out the checksum flag, in case this is a
1150 * resend but cl_checksum is no longer set. b=11238 */
1151 oa->o_valid &= ~OBD_MD_FLCKSUM;
1153 oa->o_cksum = body->oa.o_cksum;
1154 /* 1 RC per niobuf */
1155 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1156 sizeof(__u32) * niocount);
1158 if (unlikely(cli->cl_checksum) &&
1159 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1160 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1161 body->oa.o_flags = 0;
1162 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1163 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1165 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1166 /* 1 RC for the whole I/O */
1168 ptlrpc_request_set_replen(req);
1170 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1171 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1173 aa->aa_requested_nob = requested_nob;
1174 aa->aa_nio_count = niocount;
1175 aa->aa_page_count = page_count;
1179 INIT_LIST_HEAD(&aa->aa_oaps);
1185 ptlrpc_req_finished(req);
1189 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1190 __u32 client_cksum, __u32 server_cksum, int nob,
1191 obd_count page_count, struct brw_page **pga,
1192 cksum_type_t client_cksum_type)
1196 cksum_type_t cksum_type;
1198 if (server_cksum == client_cksum) {
1199 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1203 if (oa->o_valid & OBD_MD_FLFLAGS)
1204 cksum_type = cksum_type_unpack(oa->o_flags);
1206 cksum_type = OBD_CKSUM_CRC32;
1208 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1211 if (cksum_type != client_cksum_type)
1212 msg = "the server did not use the checksum type specified in "
1213 "the original request - likely a protocol problem";
1214 else if (new_cksum == server_cksum)
1215 msg = "changed on the client after we checksummed it - "
1216 "likely false positive due to mmap IO (bug 11742)";
1217 else if (new_cksum == client_cksum)
1218 msg = "changed in transit before arrival at OST";
1220 msg = "changed in transit AND doesn't match the original - "
1221 "likely false positive due to mmap IO (bug 11742)";
1223 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1224 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1225 "["LPU64"-"LPU64"]\n",
1226 msg, libcfs_nid2str(peer->nid),
1227 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1228 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1231 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1233 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1234 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1235 "client csum now %x\n", client_cksum, client_cksum_type,
1236 server_cksum, cksum_type, new_cksum);
1240 /* Note rc enters this function as number of bytes transferred */
1241 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1243 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1244 const lnet_process_id_t *peer =
1245 &req->rq_import->imp_connection->c_peer;
1246 struct client_obd *cli = aa->aa_cli;
1247 struct ost_body *body;
1248 __u32 client_cksum = 0;
1251 if (rc < 0 && rc != -EDQUOT)
1254 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1255 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1256 lustre_swab_ost_body);
1258 CDEBUG(D_INFO, "Can't unpack body\n");
1262 /* set/clear over quota flag for a uid/gid */
1263 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1264 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1265 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1266 body->oa.o_gid, body->oa.o_valid,
1272 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1273 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1275 osc_update_grant(cli, body);
1277 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1279 CERROR("Unexpected +ve rc %d\n", rc);
1282 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1284 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1285 check_write_checksum(&body->oa, peer, client_cksum,
1286 body->oa.o_cksum, aa->aa_requested_nob,
1287 aa->aa_page_count, aa->aa_ppga,
1288 cksum_type_unpack(aa->aa_oa->o_flags)))
1291 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1294 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1295 aa->aa_page_count, aa->aa_ppga);
1299 /* The rest of this function executes only for OST_READs */
1300 if (rc > aa->aa_requested_nob) {
1301 CERROR("Unexpected rc %d (%d requested)\n", rc,
1302 aa->aa_requested_nob);
1306 if (rc != req->rq_bulk->bd_nob_transferred) {
1307 CERROR ("Unexpected rc %d (%d transferred)\n",
1308 rc, req->rq_bulk->bd_nob_transferred);
1312 if (rc < aa->aa_requested_nob)
1313 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1315 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1317 GOTO(out, rc = -EAGAIN);
1319 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1320 static int cksum_counter;
1321 __u32 server_cksum = body->oa.o_cksum;
1324 cksum_type_t cksum_type;
1326 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1327 cksum_type = cksum_type_unpack(body->oa.o_flags);
1329 cksum_type = OBD_CKSUM_CRC32;
1330 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1331 aa->aa_ppga, OST_READ,
1334 if (peer->nid == req->rq_bulk->bd_sender) {
1338 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1341 if (server_cksum == ~0 && rc > 0) {
1342 CERROR("Protocol error: server %s set the 'checksum' "
1343 "bit, but didn't send a checksum. Not fatal, "
1344 "but please tell CFS.\n",
1345 libcfs_nid2str(peer->nid));
1346 } else if (server_cksum != client_cksum) {
1347 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1348 "%s%s%s inum "LPU64"/"LPU64" object "
1349 LPU64"/"LPU64" extent "
1350 "["LPU64"-"LPU64"]\n",
1351 req->rq_import->imp_obd->obd_name,
1352 libcfs_nid2str(peer->nid),
1354 body->oa.o_valid & OBD_MD_FLFID ?
1355 body->oa.o_fid : (__u64)0,
1356 body->oa.o_valid & OBD_MD_FLFID ?
1357 body->oa.o_generation :(__u64)0,
1359 body->oa.o_valid & OBD_MD_FLGROUP ?
1360 body->oa.o_gr : (__u64)0,
1361 aa->aa_ppga[0]->off,
1362 aa->aa_ppga[aa->aa_page_count-1]->off +
1363 aa->aa_ppga[aa->aa_page_count-1]->count -
1365 CERROR("client %x, server %x, cksum_type %x\n",
1366 client_cksum, server_cksum, cksum_type);
1368 aa->aa_oa->o_cksum = client_cksum;
1372 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1375 } else if (unlikely(client_cksum)) {
1376 static int cksum_missed;
1379 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1380 CERROR("Checksum %u requested from %s but not sent\n",
1381 cksum_missed, libcfs_nid2str(peer->nid));
1387 *aa->aa_oa = body->oa;
1392 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1393 struct lov_stripe_md *lsm,
1394 obd_count page_count, struct brw_page **pga,
1395 struct obd_capa *ocapa)
1397 struct ptlrpc_request *req;
1401 struct l_wait_info lwi;
1405 cfs_waitq_init(&waitq);
1408 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1409 page_count, pga, &req, ocapa);
1413 rc = ptlrpc_queue_wait(req);
1415 if (rc == -ETIMEDOUT && req->rq_resend) {
1416 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1417 ptlrpc_req_finished(req);
1421 rc = osc_brw_fini_request(req, rc);
1423 ptlrpc_req_finished(req);
1424 if (osc_recoverable_error(rc)) {
1426 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1427 CERROR("too many resend retries, returning error\n");
1431 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1432 l_wait_event(waitq, 0, &lwi);
1440 int osc_brw_redo_request(struct ptlrpc_request *request,
1441 struct osc_brw_async_args *aa)
1443 struct ptlrpc_request *new_req;
1444 struct ptlrpc_request_set *set = request->rq_set;
1445 struct osc_brw_async_args *new_aa;
1446 struct osc_async_page *oap;
1450 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1451 CERROR("too many resend retries, returning error\n");
1455 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1457 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1458 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1459 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1462 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1463 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1464 aa->aa_cli, aa->aa_oa,
1465 NULL /* lsm unused by osc currently */,
1466 aa->aa_page_count, aa->aa_ppga,
1467 &new_req, NULL /* ocapa */);
1471 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1473 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1474 if (oap->oap_request != NULL) {
1475 LASSERTF(request == oap->oap_request,
1476 "request %p != oap_request %p\n",
1477 request, oap->oap_request);
1478 if (oap->oap_interrupted) {
1479 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1480 ptlrpc_req_finished(new_req);
1485 /* New request takes over pga and oaps from old request.
1486 * Note that copying a list_head doesn't work, need to move it... */
1488 new_req->rq_interpret_reply = request->rq_interpret_reply;
1489 new_req->rq_async_args = request->rq_async_args;
1490 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1492 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1494 INIT_LIST_HEAD(&new_aa->aa_oaps);
1495 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1496 INIT_LIST_HEAD(&aa->aa_oaps);
1498 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1499 if (oap->oap_request) {
1500 ptlrpc_req_finished(oap->oap_request);
1501 oap->oap_request = ptlrpc_request_addref(new_req);
1505 /* use ptlrpc_set_add_req is safe because interpret functions work
1506 * in check_set context. only one way exist with access to request
1507 * from different thread got -EINTR - this way protected with
1508 * cl_loi_list_lock */
1509 ptlrpc_set_add_req(set, new_req);
1511 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1513 DEBUG_REQ(D_INFO, new_req, "new request");
1517 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1519 struct osc_brw_async_args *aa = data;
1523 rc = osc_brw_fini_request(req, rc);
1524 if (osc_recoverable_error(rc)) {
1525 rc = osc_brw_redo_request(req, aa);
1530 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1531 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1532 aa->aa_cli->cl_w_in_flight--;
1534 aa->aa_cli->cl_r_in_flight--;
1535 for (i = 0; i < aa->aa_page_count; i++)
1536 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1537 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1539 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1544 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1545 struct lov_stripe_md *lsm, obd_count page_count,
1546 struct brw_page **pga, struct ptlrpc_request_set *set,
1547 struct obd_capa *ocapa)
1549 struct ptlrpc_request *req;
1550 struct client_obd *cli = &exp->exp_obd->u.cli;
1552 struct osc_brw_async_args *aa;
1555 /* Consume write credits even if doing a sync write -
1556 * otherwise we may run out of space on OST due to grant. */
1557 if (cmd == OBD_BRW_WRITE) {
1558 spin_lock(&cli->cl_loi_list_lock);
1559 for (i = 0; i < page_count; i++) {
1560 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1561 osc_consume_write_grant(cli, pga[i]);
1563 spin_unlock(&cli->cl_loi_list_lock);
1566 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1569 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1570 if (cmd == OBD_BRW_READ) {
1571 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1572 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1573 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1575 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1576 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1577 cli->cl_w_in_flight);
1578 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1582 req->rq_interpret_reply = brw_interpret;
1583 ptlrpc_set_add_req(set, req);
1584 client_obd_list_lock(&cli->cl_loi_list_lock);
1585 if (cmd == OBD_BRW_READ)
1586 cli->cl_r_in_flight++;
1588 cli->cl_w_in_flight++;
1589 client_obd_list_unlock(&cli->cl_loi_list_lock);
1590 } else if (cmd == OBD_BRW_WRITE) {
1591 client_obd_list_lock(&cli->cl_loi_list_lock);
1592 for (i = 0; i < page_count; i++)
1593 osc_release_write_grant(cli, pga[i], 0);
1594 client_obd_list_unlock(&cli->cl_loi_list_lock);
1600 * ugh, we want disk allocation on the target to happen in offset order. we'll
1601 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1602 * fine for our small page arrays and doesn't require allocation. its an
1603 * insertion sort that swaps elements that are strides apart, shrinking the
1604 * stride down until its '1' and the array is sorted.
1606 static void sort_brw_pages(struct brw_page **array, int num)
1609 struct brw_page *tmp;
1613 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1618 for (i = stride ; i < num ; i++) {
1621 while (j >= stride && array[j - stride]->off > tmp->off) {
1622 array[j] = array[j - stride];
1627 } while (stride > 1);
1630 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1636 LASSERT (pages > 0);
1637 offset = pg[i]->off & ~CFS_PAGE_MASK;
1641 if (pages == 0) /* that's all */
1644 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1645 return count; /* doesn't end on page boundary */
1648 offset = pg[i]->off & ~CFS_PAGE_MASK;
1649 if (offset != 0) /* doesn't start on page boundary */
1656 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1658 struct brw_page **ppga;
1661 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1665 for (i = 0; i < count; i++)
1670 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1672 LASSERT(ppga != NULL);
1673 OBD_FREE(ppga, sizeof(*ppga) * count);
1676 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1677 obd_count page_count, struct brw_page *pga,
1678 struct obd_trans_info *oti)
1680 struct obdo *saved_oa = NULL;
1681 struct brw_page **ppga, **orig;
1682 struct obd_import *imp = class_exp2cliimp(exp);
1683 struct client_obd *cli = &imp->imp_obd->u.cli;
1684 int rc, page_count_orig;
1687 if (cmd & OBD_BRW_CHECK) {
1688 /* The caller just wants to know if there's a chance that this
1689 * I/O can succeed */
1691 if (imp == NULL || imp->imp_invalid)
1696 /* test_brw with a failed create can trip this, maybe others. */
1697 LASSERT(cli->cl_max_pages_per_rpc);
1701 orig = ppga = osc_build_ppga(pga, page_count);
1704 page_count_orig = page_count;
1706 sort_brw_pages(ppga, page_count);
1707 while (page_count) {
1708 obd_count pages_per_brw;
1710 if (page_count > cli->cl_max_pages_per_rpc)
1711 pages_per_brw = cli->cl_max_pages_per_rpc;
1713 pages_per_brw = page_count;
1715 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1717 if (saved_oa != NULL) {
1718 /* restore previously saved oa */
1719 *oinfo->oi_oa = *saved_oa;
1720 } else if (page_count > pages_per_brw) {
1721 /* save a copy of oa (brw will clobber it) */
1722 OBDO_ALLOC(saved_oa);
1723 if (saved_oa == NULL)
1724 GOTO(out, rc = -ENOMEM);
1725 *saved_oa = *oinfo->oi_oa;
1728 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1729 pages_per_brw, ppga, oinfo->oi_capa);
1734 page_count -= pages_per_brw;
1735 ppga += pages_per_brw;
1739 osc_release_ppga(orig, page_count_orig);
1741 if (saved_oa != NULL)
1742 OBDO_FREE(saved_oa);
1747 static int osc_brw_async(int cmd, struct obd_export *exp,
1748 struct obd_info *oinfo, obd_count page_count,
1749 struct brw_page *pga, struct obd_trans_info *oti,
1750 struct ptlrpc_request_set *set)
1752 struct brw_page **ppga, **orig;
1753 struct client_obd *cli = &exp->exp_obd->u.cli;
1754 int page_count_orig;
1758 if (cmd & OBD_BRW_CHECK) {
1759 struct obd_import *imp = class_exp2cliimp(exp);
1760 /* The caller just wants to know if there's a chance that this
1761 * I/O can succeed */
1763 if (imp == NULL || imp->imp_invalid)
1768 orig = ppga = osc_build_ppga(pga, page_count);
1771 page_count_orig = page_count;
1773 sort_brw_pages(ppga, page_count);
1774 while (page_count) {
1775 struct brw_page **copy;
1776 obd_count pages_per_brw;
1778 pages_per_brw = min_t(obd_count, page_count,
1779 cli->cl_max_pages_per_rpc);
1781 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1783 /* use ppga only if single RPC is going to fly */
1784 if (pages_per_brw != page_count_orig || ppga != orig) {
1785 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1787 GOTO(out, rc = -ENOMEM);
1788 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1792 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1793 pages_per_brw, copy, set, oinfo->oi_capa);
1797 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1801 /* we passed it to async_internal() which is
1802 * now responsible for releasing memory */
1806 page_count -= pages_per_brw;
1807 ppga += pages_per_brw;
1811 osc_release_ppga(orig, page_count_orig);
1815 static void osc_check_rpcs(struct client_obd *cli);
1817 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1818 * the dirty accounting. Writeback completes or truncate happens before
1819 * writing starts. Must be called with the loi lock held. */
1820 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1823 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1827 /* This maintains the lists of pending pages to read/write for a given object
1828 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1829 * to quickly find objects that are ready to send an RPC. */
1830 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1836 if (lop->lop_num_pending == 0)
1839 /* if we have an invalid import we want to drain the queued pages
1840 * by forcing them through rpcs that immediately fail and complete
1841 * the pages. recovery relies on this to empty the queued pages
1842 * before canceling the locks and evicting down the llite pages */
1843 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1846 /* stream rpcs in queue order as long as as there is an urgent page
1847 * queued. this is our cheap solution for good batching in the case
1848 * where writepage marks some random page in the middle of the file
1849 * as urgent because of, say, memory pressure */
1850 if (!list_empty(&lop->lop_urgent)) {
1851 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1854 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1855 optimal = cli->cl_max_pages_per_rpc;
1856 if (cmd & OBD_BRW_WRITE) {
1857 /* trigger a write rpc stream as long as there are dirtiers
1858 * waiting for space. as they're waiting, they're not going to
1859 * create more pages to coallesce with what's waiting.. */
1860 if (!list_empty(&cli->cl_cache_waiters)) {
1861 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1864 /* +16 to avoid triggering rpcs that would want to include pages
1865 * that are being queued but which can't be made ready until
1866 * the queuer finishes with the page. this is a wart for
1867 * llite::commit_write() */
1870 if (lop->lop_num_pending >= optimal)
1876 static void on_list(struct list_head *item, struct list_head *list,
1879 if (list_empty(item) && should_be_on)
1880 list_add_tail(item, list);
1881 else if (!list_empty(item) && !should_be_on)
1882 list_del_init(item);
1885 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1886 * can find pages to build into rpcs quickly */
1887 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1889 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1890 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1891 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1893 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1894 loi->loi_write_lop.lop_num_pending);
1896 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1897 loi->loi_read_lop.lop_num_pending);
1900 static void lop_update_pending(struct client_obd *cli,
1901 struct loi_oap_pages *lop, int cmd, int delta)
1903 lop->lop_num_pending += delta;
1904 if (cmd & OBD_BRW_WRITE)
1905 cli->cl_pending_w_pages += delta;
1907 cli->cl_pending_r_pages += delta;
1910 /* this is called when a sync waiter receives an interruption. Its job is to
1911 * get the caller woken as soon as possible. If its page hasn't been put in an
1912 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1913 * desiring interruption which will forcefully complete the rpc once the rpc
1915 static void osc_occ_interrupted(struct oig_callback_context *occ)
1917 struct osc_async_page *oap;
1918 struct loi_oap_pages *lop;
1919 struct lov_oinfo *loi;
1922 /* XXX member_of() */
1923 oap = list_entry(occ, struct osc_async_page, oap_occ);
1925 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1927 oap->oap_interrupted = 1;
1929 /* ok, it's been put in an rpc. only one oap gets a request reference */
1930 if (oap->oap_request != NULL) {
1931 ptlrpc_mark_interrupted(oap->oap_request);
1932 ptlrpcd_wake(oap->oap_request);
1936 /* we don't get interruption callbacks until osc_trigger_group_io()
1937 * has been called and put the sync oaps in the pending/urgent lists.*/
1938 if (!list_empty(&oap->oap_pending_item)) {
1939 list_del_init(&oap->oap_pending_item);
1940 list_del_init(&oap->oap_urgent_item);
1943 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1944 &loi->loi_write_lop : &loi->loi_read_lop;
1945 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1946 loi_list_maint(oap->oap_cli, oap->oap_loi);
1948 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1949 oap->oap_oig = NULL;
1953 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1956 /* this is trying to propogate async writeback errors back up to the
1957 * application. As an async write fails we record the error code for later if
1958 * the app does an fsync. As long as errors persist we force future rpcs to be
1959 * sync so that the app can get a sync error and break the cycle of queueing
1960 * pages for which writeback will fail. */
1961 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1968 ar->ar_force_sync = 1;
1969 ar->ar_min_xid = ptlrpc_sample_next_xid();
1974 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1975 ar->ar_force_sync = 0;
1978 static void osc_oap_to_pending(struct osc_async_page *oap)
1980 struct loi_oap_pages *lop;
1982 if (oap->oap_cmd & OBD_BRW_WRITE)
1983 lop = &oap->oap_loi->loi_write_lop;
1985 lop = &oap->oap_loi->loi_read_lop;
1987 if (oap->oap_async_flags & ASYNC_URGENT)
1988 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1989 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1990 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1993 /* this must be called holding the loi list lock to give coverage to exit_cache,
1994 * async_flag maintenance, and oap_request */
1995 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1996 struct osc_async_page *oap, int sent, int rc)
2001 if (oap->oap_request != NULL) {
2002 xid = ptlrpc_req_xid(oap->oap_request);
2003 ptlrpc_req_finished(oap->oap_request);
2004 oap->oap_request = NULL;
2007 oap->oap_async_flags = 0;
2008 oap->oap_interrupted = 0;
2010 if (oap->oap_cmd & OBD_BRW_WRITE) {
2011 osc_process_ar(&cli->cl_ar, xid, rc);
2012 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2015 if (rc == 0 && oa != NULL) {
2016 if (oa->o_valid & OBD_MD_FLBLOCKS)
2017 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2018 if (oa->o_valid & OBD_MD_FLMTIME)
2019 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2020 if (oa->o_valid & OBD_MD_FLATIME)
2021 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2022 if (oa->o_valid & OBD_MD_FLCTIME)
2023 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2027 osc_exit_cache(cli, oap, sent);
2028 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2029 oap->oap_oig = NULL;
2034 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2035 oap->oap_cmd, oa, rc);
2037 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2038 * I/O on the page could start, but OSC calls it under lock
2039 * and thus we can add oap back to pending safely */
2041 /* upper layer wants to leave the page on pending queue */
2042 osc_oap_to_pending(oap);
2044 osc_exit_cache(cli, oap, sent);
2048 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2050 struct osc_async_page *oap, *tmp;
2051 struct osc_brw_async_args *aa = data;
2052 struct client_obd *cli;
2055 rc = osc_brw_fini_request(req, rc);
2056 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2057 if (osc_recoverable_error(rc)) {
2058 rc = osc_brw_redo_request(req, aa);
2065 client_obd_list_lock(&cli->cl_loi_list_lock);
2067 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2068 * is called so we know whether to go to sync BRWs or wait for more
2069 * RPCs to complete */
2070 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2071 cli->cl_w_in_flight--;
2073 cli->cl_r_in_flight--;
2075 /* the caller may re-use the oap after the completion call so
2076 * we need to clean it up a little */
2077 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2078 list_del_init(&oap->oap_rpc_item);
2079 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2082 osc_wake_cache_waiters(cli);
2083 osc_check_rpcs(cli);
2085 client_obd_list_unlock(&cli->cl_loi_list_lock);
2087 OBDO_FREE(aa->aa_oa);
2089 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2093 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2094 struct list_head *rpc_list,
2095 int page_count, int cmd)
2097 struct ptlrpc_request *req;
2098 struct brw_page **pga = NULL;
2099 struct osc_brw_async_args *aa;
2100 struct obdo *oa = NULL;
2101 struct obd_async_page_ops *ops = NULL;
2102 void *caller_data = NULL;
2103 struct obd_capa *ocapa;
2104 struct osc_async_page *oap;
2108 LASSERT(!list_empty(rpc_list));
2110 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2112 RETURN(ERR_PTR(-ENOMEM));
2116 GOTO(out, req = ERR_PTR(-ENOMEM));
2119 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2121 ops = oap->oap_caller_ops;
2122 caller_data = oap->oap_caller_data;
2124 pga[i] = &oap->oap_brw_page;
2125 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2126 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2127 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2131 /* always get the data for the obdo for the rpc */
2132 LASSERT(ops != NULL);
2133 ops->ap_fill_obdo(caller_data, cmd, oa);
2134 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2136 sort_brw_pages(pga, page_count);
2137 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2141 CERROR("prep_req failed: %d\n", rc);
2142 GOTO(out, req = ERR_PTR(rc));
2145 /* Need to update the timestamps after the request is built in case
2146 * we race with setattr (locally or in queue at OST). If OST gets
2147 * later setattr before earlier BRW (as determined by the request xid),
2148 * the OST will not use BRW timestamps. Sadly, there is no obvious
2149 * way to do this in a single call. bug 10150 */
2150 ops->ap_update_obdo(caller_data, cmd, oa,
2151 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2153 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2154 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2155 INIT_LIST_HEAD(&aa->aa_oaps);
2156 list_splice(rpc_list, &aa->aa_oaps);
2157 INIT_LIST_HEAD(rpc_list);
2164 OBD_FREE(pga, sizeof(*pga) * page_count);
2169 /* the loi lock is held across this function but it's allowed to release
2170 * and reacquire it during its work */
2171 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2172 int cmd, struct loi_oap_pages *lop)
2174 struct ptlrpc_request *req;
2175 obd_count page_count = 0;
2176 struct osc_async_page *oap = NULL, *tmp;
2177 struct osc_brw_async_args *aa;
2178 struct obd_async_page_ops *ops;
2179 CFS_LIST_HEAD(rpc_list);
2180 unsigned int ending_offset;
2181 unsigned starting_offset = 0;
2184 /* first we find the pages we're allowed to work with */
2185 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2187 ops = oap->oap_caller_ops;
2189 LASSERT(oap->oap_magic == OAP_MAGIC);
2191 /* in llite being 'ready' equates to the page being locked
2192 * until completion unlocks it. commit_write submits a page
2193 * as not ready because its unlock will happen unconditionally
2194 * as the call returns. if we race with commit_write giving
2195 * us that page we dont' want to create a hole in the page
2196 * stream, so we stop and leave the rpc to be fired by
2197 * another dirtier or kupdated interval (the not ready page
2198 * will still be on the dirty list). we could call in
2199 * at the end of ll_file_write to process the queue again. */
2200 if (!(oap->oap_async_flags & ASYNC_READY)) {
2201 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2203 CDEBUG(D_INODE, "oap %p page %p returned %d "
2204 "instead of ready\n", oap,
2208 /* llite is telling us that the page is still
2209 * in commit_write and that we should try
2210 * and put it in an rpc again later. we
2211 * break out of the loop so we don't create
2212 * a hole in the sequence of pages in the rpc
2217 /* the io isn't needed.. tell the checks
2218 * below to complete the rpc with EINTR */
2219 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2220 oap->oap_count = -EINTR;
2223 oap->oap_async_flags |= ASYNC_READY;
2226 LASSERTF(0, "oap %p page %p returned %d "
2227 "from make_ready\n", oap,
2235 * Page submitted for IO has to be locked. Either by
2236 * ->ap_make_ready() or by higher layers.
2238 * XXX nikita: this assertion should be adjusted when lustre
2239 * starts using PG_writeback for pages being written out.
2241 #if defined(__KERNEL__) && defined(__linux__)
2242 LASSERT(PageLocked(oap->oap_page));
2244 /* If there is a gap at the start of this page, it can't merge
2245 * with any previous page, so we'll hand the network a
2246 * "fragmented" page array that it can't transfer in 1 RDMA */
2247 if (page_count != 0 && oap->oap_page_off != 0)
2250 /* take the page out of our book-keeping */
2251 list_del_init(&oap->oap_pending_item);
2252 lop_update_pending(cli, lop, cmd, -1);
2253 list_del_init(&oap->oap_urgent_item);
2255 if (page_count == 0)
2256 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2257 (PTLRPC_MAX_BRW_SIZE - 1);
2259 /* ask the caller for the size of the io as the rpc leaves. */
2260 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2262 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2263 if (oap->oap_count <= 0) {
2264 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2266 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2270 /* now put the page back in our accounting */
2271 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2272 if (++page_count >= cli->cl_max_pages_per_rpc)
2275 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2276 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2277 * have the same alignment as the initial writes that allocated
2278 * extents on the server. */
2279 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2280 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2281 if (ending_offset == 0)
2284 /* If there is a gap at the end of this page, it can't merge
2285 * with any subsequent pages, so we'll hand the network a
2286 * "fragmented" page array that it can't transfer in 1 RDMA */
2287 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2291 osc_wake_cache_waiters(cli);
2293 if (page_count == 0)
2296 loi_list_maint(cli, loi);
2298 client_obd_list_unlock(&cli->cl_loi_list_lock);
2300 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2302 /* this should happen rarely and is pretty bad, it makes the
2303 * pending list not follow the dirty order */
2304 client_obd_list_lock(&cli->cl_loi_list_lock);
2305 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2306 list_del_init(&oap->oap_rpc_item);
2308 /* queued sync pages can be torn down while the pages
2309 * were between the pending list and the rpc */
2310 if (oap->oap_interrupted) {
2311 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2312 osc_ap_completion(cli, NULL, oap, 0,
2316 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2318 loi_list_maint(cli, loi);
2319 RETURN(PTR_ERR(req));
2322 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2324 if (cmd == OBD_BRW_READ) {
2325 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2326 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2327 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2328 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2329 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2331 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2332 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2333 cli->cl_w_in_flight);
2334 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2335 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2336 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2339 client_obd_list_lock(&cli->cl_loi_list_lock);
2341 if (cmd == OBD_BRW_READ)
2342 cli->cl_r_in_flight++;
2344 cli->cl_w_in_flight++;
2346 /* queued sync pages can be torn down while the pages
2347 * were between the pending list and the rpc */
2349 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2350 /* only one oap gets a request reference */
2353 if (oap->oap_interrupted && !req->rq_intr) {
2354 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2356 ptlrpc_mark_interrupted(req);
2360 tmp->oap_request = ptlrpc_request_addref(req);
2362 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2363 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2365 req->rq_interpret_reply = brw_interpret_oap;
2366 ptlrpcd_add_req(req);
2370 #define LOI_DEBUG(LOI, STR, args...) \
2371 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2372 !list_empty(&(LOI)->loi_cli_item), \
2373 (LOI)->loi_write_lop.lop_num_pending, \
2374 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2375 (LOI)->loi_read_lop.lop_num_pending, \
2376 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2379 /* This is called by osc_check_rpcs() to find which objects have pages that
2380 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2381 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2384 /* first return all objects which we already know to have
2385 * pages ready to be stuffed into rpcs */
2386 if (!list_empty(&cli->cl_loi_ready_list))
2387 RETURN(list_entry(cli->cl_loi_ready_list.next,
2388 struct lov_oinfo, loi_cli_item));
2390 /* then if we have cache waiters, return all objects with queued
2391 * writes. This is especially important when many small files
2392 * have filled up the cache and not been fired into rpcs because
2393 * they don't pass the nr_pending/object threshhold */
2394 if (!list_empty(&cli->cl_cache_waiters) &&
2395 !list_empty(&cli->cl_loi_write_list))
2396 RETURN(list_entry(cli->cl_loi_write_list.next,
2397 struct lov_oinfo, loi_write_item));
2399 /* then return all queued objects when we have an invalid import
2400 * so that they get flushed */
2401 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2402 if (!list_empty(&cli->cl_loi_write_list))
2403 RETURN(list_entry(cli->cl_loi_write_list.next,
2404 struct lov_oinfo, loi_write_item));
2405 if (!list_empty(&cli->cl_loi_read_list))
2406 RETURN(list_entry(cli->cl_loi_read_list.next,
2407 struct lov_oinfo, loi_read_item));
2412 /* called with the loi list lock held */
2413 static void osc_check_rpcs(struct client_obd *cli)
2415 struct lov_oinfo *loi;
2416 int rc = 0, race_counter = 0;
2419 while ((loi = osc_next_loi(cli)) != NULL) {
2420 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2422 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2425 /* attempt some read/write balancing by alternating between
2426 * reads and writes in an object. The makes_rpc checks here
2427 * would be redundant if we were getting read/write work items
2428 * instead of objects. we don't want send_oap_rpc to drain a
2429 * partial read pending queue when we're given this object to
2430 * do io on writes while there are cache waiters */
2431 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2432 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2433 &loi->loi_write_lop);
2441 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2442 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2443 &loi->loi_read_lop);
2452 /* attempt some inter-object balancing by issueing rpcs
2453 * for each object in turn */
2454 if (!list_empty(&loi->loi_cli_item))
2455 list_del_init(&loi->loi_cli_item);
2456 if (!list_empty(&loi->loi_write_item))
2457 list_del_init(&loi->loi_write_item);
2458 if (!list_empty(&loi->loi_read_item))
2459 list_del_init(&loi->loi_read_item);
2461 loi_list_maint(cli, loi);
2463 /* send_oap_rpc fails with 0 when make_ready tells it to
2464 * back off. llite's make_ready does this when it tries
2465 * to lock a page queued for write that is already locked.
2466 * we want to try sending rpcs from many objects, but we
2467 * don't want to spin failing with 0. */
2468 if (race_counter == 10)
2474 /* we're trying to queue a page in the osc so we're subject to the
2475 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2476 * If the osc's queued pages are already at that limit, then we want to sleep
2477 * until there is space in the osc's queue for us. We also may be waiting for
2478 * write credits from the OST if there are RPCs in flight that may return some
2479 * before we fall back to sync writes.
2481 * We need this know our allocation was granted in the presence of signals */
2482 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2486 client_obd_list_lock(&cli->cl_loi_list_lock);
2487 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2488 client_obd_list_unlock(&cli->cl_loi_list_lock);
2492 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2493 * grant or cache space. */
2494 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2495 struct osc_async_page *oap)
2497 struct osc_cache_waiter ocw;
2498 struct l_wait_info lwi = { 0 };
2502 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2503 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2504 cli->cl_dirty_max, obd_max_dirty_pages,
2505 cli->cl_lost_grant, cli->cl_avail_grant);
2507 /* force the caller to try sync io. this can jump the list
2508 * of queued writes and create a discontiguous rpc stream */
2509 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2510 loi->loi_ar.ar_force_sync)
2513 /* Hopefully normal case - cache space and write credits available */
2514 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2515 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2516 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2517 /* account for ourselves */
2518 osc_consume_write_grant(cli, &oap->oap_brw_page);
2522 /* Make sure that there are write rpcs in flight to wait for. This
2523 * is a little silly as this object may not have any pending but
2524 * other objects sure might. */
2525 if (cli->cl_w_in_flight) {
2526 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2527 cfs_waitq_init(&ocw.ocw_waitq);
2531 loi_list_maint(cli, loi);
2532 osc_check_rpcs(cli);
2533 client_obd_list_unlock(&cli->cl_loi_list_lock);
2535 CDEBUG(D_CACHE, "sleeping for cache space\n");
2536 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2538 client_obd_list_lock(&cli->cl_loi_list_lock);
2539 if (!list_empty(&ocw.ocw_entry)) {
2540 list_del(&ocw.ocw_entry);
2549 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2550 struct lov_oinfo *loi, cfs_page_t *page,
2551 obd_off offset, struct obd_async_page_ops *ops,
2552 void *data, void **res)
2554 struct osc_async_page *oap;
2558 return size_round(sizeof(*oap));
2561 oap->oap_magic = OAP_MAGIC;
2562 oap->oap_cli = &exp->exp_obd->u.cli;
2565 oap->oap_caller_ops = ops;
2566 oap->oap_caller_data = data;
2568 oap->oap_page = page;
2569 oap->oap_obj_off = offset;
2571 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2572 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2573 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2575 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2577 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2581 struct osc_async_page *oap_from_cookie(void *cookie)
2583 struct osc_async_page *oap = cookie;
2584 if (oap->oap_magic != OAP_MAGIC)
2585 return ERR_PTR(-EINVAL);
2589 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2590 struct lov_oinfo *loi, void *cookie,
2591 int cmd, obd_off off, int count,
2592 obd_flag brw_flags, enum async_flags async_flags)
2594 struct client_obd *cli = &exp->exp_obd->u.cli;
2595 struct osc_async_page *oap;
2599 oap = oap_from_cookie(cookie);
2601 RETURN(PTR_ERR(oap));
2603 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2606 if (!list_empty(&oap->oap_pending_item) ||
2607 !list_empty(&oap->oap_urgent_item) ||
2608 !list_empty(&oap->oap_rpc_item))
2611 /* check if the file's owner/group is over quota */
2612 #ifdef HAVE_QUOTA_SUPPORT
2613 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2614 struct obd_async_page_ops *ops;
2621 ops = oap->oap_caller_ops;
2622 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2623 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2634 loi = lsm->lsm_oinfo[0];
2636 client_obd_list_lock(&cli->cl_loi_list_lock);
2639 oap->oap_page_off = off;
2640 oap->oap_count = count;
2641 oap->oap_brw_flags = brw_flags;
2642 oap->oap_async_flags = async_flags;
2644 if (cmd & OBD_BRW_WRITE) {
2645 rc = osc_enter_cache(cli, loi, oap);
2647 client_obd_list_unlock(&cli->cl_loi_list_lock);
2652 osc_oap_to_pending(oap);
2653 loi_list_maint(cli, loi);
2655 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2658 osc_check_rpcs(cli);
2659 client_obd_list_unlock(&cli->cl_loi_list_lock);
2664 /* aka (~was & now & flag), but this is more clear :) */
2665 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2667 static int osc_set_async_flags(struct obd_export *exp,
2668 struct lov_stripe_md *lsm,
2669 struct lov_oinfo *loi, void *cookie,
2670 obd_flag async_flags)
2672 struct client_obd *cli = &exp->exp_obd->u.cli;
2673 struct loi_oap_pages *lop;
2674 struct osc_async_page *oap;
2678 oap = oap_from_cookie(cookie);
2680 RETURN(PTR_ERR(oap));
2683 * bug 7311: OST-side locking is only supported for liblustre for now
2684 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2685 * implementation has to handle case where OST-locked page was picked
2686 * up by, e.g., ->writepage().
2688 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2689 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2692 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2696 loi = lsm->lsm_oinfo[0];
2698 if (oap->oap_cmd & OBD_BRW_WRITE) {
2699 lop = &loi->loi_write_lop;
2701 lop = &loi->loi_read_lop;
2704 client_obd_list_lock(&cli->cl_loi_list_lock);
2706 if (list_empty(&oap->oap_pending_item))
2707 GOTO(out, rc = -EINVAL);
2709 if ((oap->oap_async_flags & async_flags) == async_flags)
2712 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2713 oap->oap_async_flags |= ASYNC_READY;
2715 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2716 if (list_empty(&oap->oap_rpc_item)) {
2717 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2718 loi_list_maint(cli, loi);
2722 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2723 oap->oap_async_flags);
2725 osc_check_rpcs(cli);
2726 client_obd_list_unlock(&cli->cl_loi_list_lock);
2730 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2731 struct lov_oinfo *loi,
2732 struct obd_io_group *oig, void *cookie,
2733 int cmd, obd_off off, int count,
2735 obd_flag async_flags)
2737 struct client_obd *cli = &exp->exp_obd->u.cli;
2738 struct osc_async_page *oap;
2739 struct loi_oap_pages *lop;
2743 oap = oap_from_cookie(cookie);
2745 RETURN(PTR_ERR(oap));
2747 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2750 if (!list_empty(&oap->oap_pending_item) ||
2751 !list_empty(&oap->oap_urgent_item) ||
2752 !list_empty(&oap->oap_rpc_item))
2756 loi = lsm->lsm_oinfo[0];
2758 client_obd_list_lock(&cli->cl_loi_list_lock);
2761 oap->oap_page_off = off;
2762 oap->oap_count = count;
2763 oap->oap_brw_flags = brw_flags;
2764 oap->oap_async_flags = async_flags;
2766 if (cmd & OBD_BRW_WRITE)
2767 lop = &loi->loi_write_lop;
2769 lop = &loi->loi_read_lop;
2771 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2772 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2774 rc = oig_add_one(oig, &oap->oap_occ);
2777 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2778 oap, oap->oap_page, rc);
2780 client_obd_list_unlock(&cli->cl_loi_list_lock);
2785 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2786 struct loi_oap_pages *lop, int cmd)
2788 struct list_head *pos, *tmp;
2789 struct osc_async_page *oap;
2791 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2792 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2793 list_del(&oap->oap_pending_item);
2794 osc_oap_to_pending(oap);
2796 loi_list_maint(cli, loi);
2799 static int osc_trigger_group_io(struct obd_export *exp,
2800 struct lov_stripe_md *lsm,
2801 struct lov_oinfo *loi,
2802 struct obd_io_group *oig)
2804 struct client_obd *cli = &exp->exp_obd->u.cli;
2808 loi = lsm->lsm_oinfo[0];
2810 client_obd_list_lock(&cli->cl_loi_list_lock);
2812 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2813 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2815 osc_check_rpcs(cli);
2816 client_obd_list_unlock(&cli->cl_loi_list_lock);
2821 static int osc_teardown_async_page(struct obd_export *exp,
2822 struct lov_stripe_md *lsm,
2823 struct lov_oinfo *loi, void *cookie)
2825 struct client_obd *cli = &exp->exp_obd->u.cli;
2826 struct loi_oap_pages *lop;
2827 struct osc_async_page *oap;
2831 oap = oap_from_cookie(cookie);
2833 RETURN(PTR_ERR(oap));
2836 loi = lsm->lsm_oinfo[0];
2838 if (oap->oap_cmd & OBD_BRW_WRITE) {
2839 lop = &loi->loi_write_lop;
2841 lop = &loi->loi_read_lop;
2844 client_obd_list_lock(&cli->cl_loi_list_lock);
2846 if (!list_empty(&oap->oap_rpc_item))
2847 GOTO(out, rc = -EBUSY);
2849 osc_exit_cache(cli, oap, 0);
2850 osc_wake_cache_waiters(cli);
2852 if (!list_empty(&oap->oap_urgent_item)) {
2853 list_del_init(&oap->oap_urgent_item);
2854 oap->oap_async_flags &= ~ASYNC_URGENT;
2856 if (!list_empty(&oap->oap_pending_item)) {
2857 list_del_init(&oap->oap_pending_item);
2858 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2860 loi_list_maint(cli, loi);
2862 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2864 client_obd_list_unlock(&cli->cl_loi_list_lock);
2868 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2871 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2874 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2877 lock_res_and_lock(lock);
2878 #if defined (__KERNEL__) && defined (__linux__)
2879 /* Liang XXX: Darwin and Winnt checking should be added */
2880 if (lock->l_ast_data && lock->l_ast_data != data) {
2881 struct inode *new_inode = data;
2882 struct inode *old_inode = lock->l_ast_data;
2883 if (!(old_inode->i_state & I_FREEING))
2884 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2885 LASSERTF(old_inode->i_state & I_FREEING,
2886 "Found existing inode %p/%lu/%u state %lu in lock: "
2887 "setting data to %p/%lu/%u\n", old_inode,
2888 old_inode->i_ino, old_inode->i_generation,
2890 new_inode, new_inode->i_ino, new_inode->i_generation);
2893 lock->l_ast_data = data;
2894 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2895 unlock_res_and_lock(lock);
2896 LDLM_LOCK_PUT(lock);
2899 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2900 ldlm_iterator_t replace, void *data)
2902 struct ldlm_res_id res_id = { .name = {0} };
2903 struct obd_device *obd = class_exp2obd(exp);
2905 res_id.name[0] = lsm->lsm_object_id;
2906 res_id.name[2] = lsm->lsm_object_gr;
2908 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2912 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2918 /* The request was created before ldlm_cli_enqueue call. */
2919 if (rc == ELDLM_LOCK_ABORTED) {
2920 struct ldlm_reply *rep;
2921 rep = req_capsule_server_get(&req->rq_pill,
2924 LASSERT(rep != NULL);
2925 if (rep->lock_policy_res1)
2926 rc = rep->lock_policy_res1;
2930 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2931 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2932 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2933 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2934 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2937 /* Call the update callback. */
2938 rc = oinfo->oi_cb_up(oinfo, rc);
2942 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2943 struct osc_enqueue_args *aa, int rc)
2945 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2946 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2947 struct ldlm_lock *lock;
2949 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2951 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2953 /* Complete obtaining the lock procedure. */
2954 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2956 &aa->oa_oi->oi_flags,
2957 &lsm->lsm_oinfo[0]->loi_lvb,
2958 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2959 lustre_swab_ost_lvb,
2960 aa->oa_oi->oi_lockh, rc);
2962 /* Complete osc stuff. */
2963 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2965 /* Release the lock for async request. */
2966 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2967 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2969 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2970 aa->oa_oi->oi_lockh, req, aa);
2971 LDLM_LOCK_PUT(lock);
2975 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2976 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2977 * other synchronous requests, however keeping some locks and trying to obtain
2978 * others may take a considerable amount of time in a case of ost failure; and
2979 * when other sync requests do not get released lock from a client, the client
2980 * is excluded from the cluster -- such scenarious make the life difficult, so
2981 * release locks just after they are obtained. */
2982 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2983 struct ldlm_enqueue_info *einfo,
2984 struct ptlrpc_request_set *rqset)
2986 struct ldlm_res_id res_id = { .name = {0} };
2987 struct obd_device *obd = exp->exp_obd;
2988 struct ptlrpc_request *req = NULL;
2989 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2994 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2995 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2997 /* Filesystem lock extents are extended to page boundaries so that
2998 * dealing with the page cache is a little smoother. */
2999 oinfo->oi_policy.l_extent.start -=
3000 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3001 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3003 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3006 /* Next, search for already existing extent locks that will cover us */
3007 /* If we're trying to read, we also search for an existing PW lock. The
3008 * VFS and page cache already protect us locally, so lots of readers/
3009 * writers can share a single PW lock.
3011 * There are problems with conversion deadlocks, so instead of
3012 * converting a read lock to a write lock, we'll just enqueue a new
3015 * At some point we should cancel the read lock instead of making them
3016 * send us a blocking callback, but there are problems with canceling
3017 * locks out from other users right now, too. */
3018 mode = einfo->ei_mode;
3019 if (einfo->ei_mode == LCK_PR)
3021 mode = ldlm_lock_match(obd->obd_namespace,
3022 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3023 einfo->ei_type, &oinfo->oi_policy, mode,
3026 /* addref the lock only if not async requests and PW lock is
3027 * matched whereas we asked for PR. */
3028 if (!rqset && einfo->ei_mode != mode)
3029 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3030 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3033 /* I would like to be able to ASSERT here that rss <=
3034 * kms, but I can't, for reasons which are explained in
3038 /* We already have a lock, and it's referenced */
3039 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3041 /* For async requests, decref the lock. */
3042 if (einfo->ei_mode != mode)
3043 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3045 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3052 CFS_LIST_HEAD(cancels);
3053 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3054 &RQF_LDLM_ENQUEUE_LVB);
3058 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3062 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3063 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3064 ptlrpc_request_set_replen(req);
3067 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3068 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3070 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3071 &oinfo->oi_policy, &oinfo->oi_flags,
3072 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3073 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3074 lustre_swab_ost_lvb, oinfo->oi_lockh,
3078 struct osc_enqueue_args *aa;
3079 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3080 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3085 req->rq_interpret_reply = osc_enqueue_interpret;
3086 ptlrpc_set_add_req(rqset, req);
3087 } else if (intent) {
3088 ptlrpc_req_finished(req);
3093 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3095 ptlrpc_req_finished(req);
3100 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3101 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3102 int *flags, void *data, struct lustre_handle *lockh)
3104 struct ldlm_res_id res_id = { .name = {0} };
3105 struct obd_device *obd = exp->exp_obd;
3106 int lflags = *flags;
3110 res_id.name[0] = lsm->lsm_object_id;
3111 res_id.name[2] = lsm->lsm_object_gr;
3113 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3116 /* Filesystem lock extents are extended to page boundaries so that
3117 * dealing with the page cache is a little smoother */
3118 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3119 policy->l_extent.end |= ~CFS_PAGE_MASK;
3121 /* Next, search for already existing extent locks that will cover us */
3122 /* If we're trying to read, we also search for an existing PW lock. The
3123 * VFS and page cache already protect us locally, so lots of readers/
3124 * writers can share a single PW lock. */
3128 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3129 &res_id, type, policy, rc, lockh);
3131 osc_set_data_with_check(lockh, data, lflags);
3132 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3133 ldlm_lock_addref(lockh, LCK_PR);
3134 ldlm_lock_decref(lockh, LCK_PW);
3141 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3142 __u32 mode, struct lustre_handle *lockh)
3146 if (unlikely(mode == LCK_GROUP))
3147 ldlm_lock_decref_and_cancel(lockh, mode);
3149 ldlm_lock_decref(lockh, mode);
3154 static int osc_cancel_unused(struct obd_export *exp,
3155 struct lov_stripe_md *lsm, int flags,
3158 struct obd_device *obd = class_exp2obd(exp);
3159 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3162 res_id.name[0] = lsm->lsm_object_id;
3163 res_id.name[2] = lsm->lsm_object_gr;
3167 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3170 static int osc_join_lru(struct obd_export *exp,
3171 struct lov_stripe_md *lsm, int join)
3173 struct obd_device *obd = class_exp2obd(exp);
3174 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3177 res_id.name[0] = lsm->lsm_object_id;
3178 res_id.name[2] = lsm->lsm_object_gr;
3182 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3185 static int osc_statfs_interpret(struct ptlrpc_request *req,
3186 struct osc_async_args *aa, int rc)
3188 struct obd_statfs *msfs;
3194 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3196 GOTO(out, rc = -EPROTO);
3199 *aa->aa_oi->oi_osfs = *msfs;
3201 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3205 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3206 __u64 max_age, struct ptlrpc_request_set *rqset)
3208 struct ptlrpc_request *req;
3209 struct osc_async_args *aa;
3213 /* We could possibly pass max_age in the request (as an absolute
3214 * timestamp or a "seconds.usec ago") so the target can avoid doing
3215 * extra calls into the filesystem if that isn't necessary (e.g.
3216 * during mount that would help a bit). Having relative timestamps
3217 * is not so great if request processing is slow, while absolute
3218 * timestamps are not ideal because they need time synchronization. */
3219 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3223 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3225 ptlrpc_request_free(req);
3228 ptlrpc_request_set_replen(req);
3229 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3231 req->rq_interpret_reply = osc_statfs_interpret;
3232 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3233 aa = (struct osc_async_args *)&req->rq_async_args;
3236 ptlrpc_set_add_req(rqset, req);
3240 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3243 struct obd_statfs *msfs;
3244 struct ptlrpc_request *req;
3248 /* We could possibly pass max_age in the request (as an absolute
3249 * timestamp or a "seconds.usec ago") so the target can avoid doing
3250 * extra calls into the filesystem if that isn't necessary (e.g.
3251 * during mount that would help a bit). Having relative timestamps
3252 * is not so great if request processing is slow, while absolute
3253 * timestamps are not ideal because they need time synchronization. */
3254 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3258 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3260 ptlrpc_request_free(req);
3263 ptlrpc_request_set_replen(req);
3264 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3266 rc = ptlrpc_queue_wait(req);
3270 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3272 GOTO(out, rc = -EPROTO);
3279 ptlrpc_req_finished(req);
3283 /* Retrieve object striping information.
3285 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3286 * the maximum number of OST indices which will fit in the user buffer.
3287 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3289 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3291 struct lov_user_md lum, *lumk;
3292 int rc = 0, lum_size;
3298 if (copy_from_user(&lum, lump, sizeof(lum)))
3301 if (lum.lmm_magic != LOV_USER_MAGIC)
3304 if (lum.lmm_stripe_count > 0) {
3305 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3306 OBD_ALLOC(lumk, lum_size);
3310 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3311 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3313 lum_size = sizeof(lum);
3317 lumk->lmm_object_id = lsm->lsm_object_id;
3318 lumk->lmm_object_gr = lsm->lsm_object_gr;
3319 lumk->lmm_stripe_count = 1;
3321 if (copy_to_user(lump, lumk, lum_size))
3325 OBD_FREE(lumk, lum_size);
3331 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3332 void *karg, void *uarg)
3334 struct obd_device *obd = exp->exp_obd;
3335 struct obd_ioctl_data *data = karg;
3339 if (!try_module_get(THIS_MODULE)) {
3340 CERROR("Can't get module. Is it alive?");
3344 case OBD_IOC_LOV_GET_CONFIG: {
3346 struct lov_desc *desc;
3347 struct obd_uuid uuid;
3351 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3352 GOTO(out, err = -EINVAL);
3354 data = (struct obd_ioctl_data *)buf;
3356 if (sizeof(*desc) > data->ioc_inllen1) {
3357 obd_ioctl_freedata(buf, len);
3358 GOTO(out, err = -EINVAL);
3361 if (data->ioc_inllen2 < sizeof(uuid)) {
3362 obd_ioctl_freedata(buf, len);
3363 GOTO(out, err = -EINVAL);
3366 desc = (struct lov_desc *)data->ioc_inlbuf1;
3367 desc->ld_tgt_count = 1;
3368 desc->ld_active_tgt_count = 1;
3369 desc->ld_default_stripe_count = 1;
3370 desc->ld_default_stripe_size = 0;
3371 desc->ld_default_stripe_offset = 0;
3372 desc->ld_pattern = 0;
3373 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3375 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3377 err = copy_to_user((void *)uarg, buf, len);
3380 obd_ioctl_freedata(buf, len);
3383 case LL_IOC_LOV_SETSTRIPE:
3384 err = obd_alloc_memmd(exp, karg);
3388 case LL_IOC_LOV_GETSTRIPE:
3389 err = osc_getstripe(karg, uarg);
3391 case OBD_IOC_CLIENT_RECOVER:
3392 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3397 case IOC_OSC_SET_ACTIVE:
3398 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3401 case OBD_IOC_POLL_QUOTACHECK:
3402 err = lquota_poll_check(quota_interface, exp,
3403 (struct if_quotacheck *)karg);
3406 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3407 cmd, cfs_curproc_comm());
3408 GOTO(out, err = -ENOTTY);
3411 module_put(THIS_MODULE);
3415 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3416 void *key, __u32 *vallen, void *val)
3419 if (!vallen || !val)
3422 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3423 __u32 *stripe = val;
3424 *vallen = sizeof(*stripe);
3427 } else if (KEY_IS(KEY_LAST_ID)) {
3428 struct ptlrpc_request *req;
3433 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3434 &RQF_OST_GET_INFO_LAST_ID);
3438 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3439 RCL_CLIENT, keylen);
3440 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3442 ptlrpc_request_free(req);
3446 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3447 memcpy(tmp, key, keylen);
3449 ptlrpc_request_set_replen(req);
3450 rc = ptlrpc_queue_wait(req);
3454 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3456 GOTO(out, rc = -EPROTO);
3458 *((obd_id *)val) = *reply;
3460 ptlrpc_req_finished(req);
3466 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3469 struct llog_ctxt *ctxt;
3470 struct obd_import *imp = req->rq_import;
3476 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3479 rc = llog_initiator_connect(ctxt);
3481 CERROR("cannot establish connection for "
3482 "ctxt %p: %d\n", ctxt, rc);
3485 llog_ctxt_put(ctxt);
3486 spin_lock(&imp->imp_lock);
3487 imp->imp_server_timeout = 1;
3488 imp->imp_pingable = 1;
3489 spin_unlock(&imp->imp_lock);
3490 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3495 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3496 void *key, obd_count vallen, void *val,
3497 struct ptlrpc_request_set *set)
3499 struct ptlrpc_request *req;
3500 struct obd_device *obd = exp->exp_obd;
3501 struct obd_import *imp = class_exp2cliimp(exp);
3506 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3508 if (KEY_IS(KEY_NEXT_ID)) {
3509 if (vallen != sizeof(obd_id))
3513 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3514 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3515 exp->exp_obd->obd_name,
3516 obd->u.cli.cl_oscc.oscc_next_id);
3521 if (KEY_IS("unlinked")) {
3522 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3523 spin_lock(&oscc->oscc_lock);
3524 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3525 spin_unlock(&oscc->oscc_lock);
3529 if (KEY_IS(KEY_INIT_RECOV)) {
3530 if (vallen != sizeof(int))
3532 spin_lock(&imp->imp_lock);
3533 imp->imp_initial_recov = *(int *)val;
3534 spin_unlock(&imp->imp_lock);
3535 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3536 exp->exp_obd->obd_name,
3537 imp->imp_initial_recov);
3541 if (KEY_IS("checksum")) {
3542 if (vallen != sizeof(int))
3544 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3548 if (KEY_IS(KEY_FLUSH_CTX)) {
3549 sptlrpc_import_flush_my_ctx(imp);
3556 /* We pass all other commands directly to OST. Since nobody calls osc
3557 methods directly and everybody is supposed to go through LOV, we
3558 assume lov checked invalid values for us.
3559 The only recognised values so far are evict_by_nid and mds_conn.
3560 Even if something bad goes through, we'd get a -EINVAL from OST
3564 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3568 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3569 RCL_CLIENT, keylen);
3570 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3571 RCL_CLIENT, vallen);
3572 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3574 ptlrpc_request_free(req);
3578 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3579 memcpy(tmp, key, keylen);
3580 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3581 memcpy(tmp, val, vallen);
3583 if (KEY_IS(KEY_MDS_CONN)) {
3584 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3586 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3587 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3588 LASSERT(oscc->oscc_oa.o_gr > 0);
3589 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3592 ptlrpc_request_set_replen(req);
3593 ptlrpc_set_add_req(set, req);
3594 ptlrpc_check_set(set);
3600 static struct llog_operations osc_size_repl_logops = {
3601 lop_cancel: llog_obd_repl_cancel
3604 static struct llog_operations osc_mds_ost_orig_logops;
3605 static int osc_llog_init(struct obd_device *obd, int group,
3606 struct obd_device *tgt, int count,
3607 struct llog_catid *catid, struct obd_uuid *uuid)
3611 LASSERT(group == OBD_LLOG_GROUP);
3612 spin_lock(&obd->obd_dev_lock);
3613 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3614 osc_mds_ost_orig_logops = llog_lvfs_ops;
3615 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3616 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3617 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3618 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3620 spin_unlock(&obd->obd_dev_lock);
3622 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3623 &catid->lci_logid, &osc_mds_ost_orig_logops);
3625 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3629 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3630 NULL, &osc_size_repl_logops);
3632 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3635 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3636 obd->obd_name, tgt->obd_name, count, catid, rc);
3637 CERROR("logid "LPX64":0x%x\n",
3638 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3643 static int osc_llog_finish(struct obd_device *obd, int count)
3645 struct llog_ctxt *ctxt;
3646 int rc = 0, rc2 = 0;
3649 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3651 rc = llog_cleanup(ctxt);
3653 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3655 rc2 = llog_cleanup(ctxt);
3662 static int osc_reconnect(const struct lu_env *env,
3663 struct obd_export *exp, struct obd_device *obd,
3664 struct obd_uuid *cluuid,
3665 struct obd_connect_data *data)
3667 struct client_obd *cli = &obd->u.cli;
3669 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3672 client_obd_list_lock(&cli->cl_loi_list_lock);
3673 data->ocd_grant = cli->cl_avail_grant ?:
3674 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3675 lost_grant = cli->cl_lost_grant;
3676 cli->cl_lost_grant = 0;
3677 client_obd_list_unlock(&cli->cl_loi_list_lock);
3679 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3680 "cl_lost_grant: %ld\n", data->ocd_grant,
3681 cli->cl_avail_grant, lost_grant);
3682 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3683 " ocd_grant: %d\n", data->ocd_connect_flags,
3684 data->ocd_version, data->ocd_grant);
3690 static int osc_disconnect(struct obd_export *exp)
3692 struct obd_device *obd = class_exp2obd(exp);
3693 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3696 if (obd->u.cli.cl_conn_count == 1)
3697 /* flush any remaining cancel messages out to the target */
3698 llog_sync(ctxt, exp);
3700 llog_ctxt_put(ctxt);
3702 rc = client_disconnect_export(exp);
3706 static int osc_import_event(struct obd_device *obd,
3707 struct obd_import *imp,
3708 enum obd_import_event event)
3710 struct client_obd *cli;
3714 LASSERT(imp->imp_obd == obd);
3717 case IMP_EVENT_DISCON: {
3718 /* Only do this on the MDS OSC's */
3719 if (imp->imp_server_timeout) {
3720 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3722 spin_lock(&oscc->oscc_lock);
3723 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3724 spin_unlock(&oscc->oscc_lock);
3727 client_obd_list_lock(&cli->cl_loi_list_lock);
3728 cli->cl_avail_grant = 0;
3729 cli->cl_lost_grant = 0;
3730 client_obd_list_unlock(&cli->cl_loi_list_lock);
3733 case IMP_EVENT_INACTIVE: {
3734 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3737 case IMP_EVENT_INVALIDATE: {
3738 struct ldlm_namespace *ns = obd->obd_namespace;
3742 client_obd_list_lock(&cli->cl_loi_list_lock);
3743 /* all pages go to failing rpcs due to the invalid import */
3744 osc_check_rpcs(cli);
3745 client_obd_list_unlock(&cli->cl_loi_list_lock);
3747 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3751 case IMP_EVENT_ACTIVE: {
3752 /* Only do this on the MDS OSC's */
3753 if (imp->imp_server_timeout) {
3754 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3756 spin_lock(&oscc->oscc_lock);
3757 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3758 spin_unlock(&oscc->oscc_lock);
3760 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3763 case IMP_EVENT_OCD: {
3764 struct obd_connect_data *ocd = &imp->imp_connect_data;
3766 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3767 osc_init_grant(&obd->u.cli, ocd);
3770 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3771 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3773 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3777 CERROR("Unknown import event %d\n", event);
3783 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3789 rc = ptlrpcd_addref();
3793 rc = client_obd_setup(obd, lcfg);
3797 struct lprocfs_static_vars lvars = { 0 };
3798 struct client_obd *cli = &obd->u.cli;
3800 lprocfs_osc_init_vars(&lvars);
3801 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3802 lproc_osc_attach_seqstat(obd);
3803 sptlrpc_lprocfs_cliobd_attach(obd);
3804 ptlrpc_lprocfs_register_obd(obd);
3808 /* We need to allocate a few requests more, because
3809 brw_interpret_oap tries to create new requests before freeing
3810 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3811 reserved, but I afraid that might be too much wasted RAM
3812 in fact, so 2 is just my guess and still should work. */
3813 cli->cl_import->imp_rq_pool =
3814 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3816 ptlrpc_add_rqs_to_pool);
3822 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3828 case OBD_CLEANUP_EARLY: {
3829 struct obd_import *imp;
3830 imp = obd->u.cli.cl_import;
3831 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3832 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3833 ptlrpc_deactivate_import(imp);
3834 spin_lock(&imp->imp_lock);
3835 imp->imp_pingable = 0;
3836 spin_unlock(&imp->imp_lock);
3839 case OBD_CLEANUP_EXPORTS: {
3840 /* If we set up but never connected, the
3841 client import will not have been cleaned. */
3842 if (obd->u.cli.cl_import) {
3843 struct obd_import *imp;
3844 imp = obd->u.cli.cl_import;
3845 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3847 ptlrpc_invalidate_import(imp);
3848 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3849 class_destroy_import(imp);
3850 obd->u.cli.cl_import = NULL;
3854 case OBD_CLEANUP_SELF_EXP:
3855 rc = obd_llog_finish(obd, 0);
3857 CERROR("failed to cleanup llogging subsystems\n");
3859 case OBD_CLEANUP_OBD:
3865 int osc_cleanup(struct obd_device *obd)
3867 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3871 ptlrpc_lprocfs_unregister_obd(obd);
3872 lprocfs_obd_cleanup(obd);
3874 spin_lock(&oscc->oscc_lock);
3875 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3876 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3877 spin_unlock(&oscc->oscc_lock);
3879 /* free memory of osc quota cache */
3880 lquota_cleanup(quota_interface, obd);
3882 rc = client_obd_cleanup(obd);
3888 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3890 struct lustre_cfg *lcfg = buf;
3891 struct lprocfs_static_vars lvars = { 0 };
3894 lprocfs_osc_init_vars(&lvars);
3896 switch (lcfg->lcfg_command) {
3897 case LCFG_SPTLRPC_CONF:
3898 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3901 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3909 struct obd_ops osc_obd_ops = {
3910 .o_owner = THIS_MODULE,
3911 .o_setup = osc_setup,
3912 .o_precleanup = osc_precleanup,
3913 .o_cleanup = osc_cleanup,
3914 .o_add_conn = client_import_add_conn,
3915 .o_del_conn = client_import_del_conn,
3916 .o_connect = client_connect_import,
3917 .o_reconnect = osc_reconnect,
3918 .o_disconnect = osc_disconnect,
3919 .o_statfs = osc_statfs,
3920 .o_statfs_async = osc_statfs_async,
3921 .o_packmd = osc_packmd,
3922 .o_unpackmd = osc_unpackmd,
3923 .o_precreate = osc_precreate,
3924 .o_create = osc_create,
3925 .o_destroy = osc_destroy,
3926 .o_getattr = osc_getattr,
3927 .o_getattr_async = osc_getattr_async,
3928 .o_setattr = osc_setattr,
3929 .o_setattr_async = osc_setattr_async,
3931 .o_brw_async = osc_brw_async,
3932 .o_prep_async_page = osc_prep_async_page,
3933 .o_queue_async_io = osc_queue_async_io,
3934 .o_set_async_flags = osc_set_async_flags,
3935 .o_queue_group_io = osc_queue_group_io,
3936 .o_trigger_group_io = osc_trigger_group_io,
3937 .o_teardown_async_page = osc_teardown_async_page,
3938 .o_punch = osc_punch,
3940 .o_enqueue = osc_enqueue,
3941 .o_match = osc_match,
3942 .o_change_cbdata = osc_change_cbdata,
3943 .o_cancel = osc_cancel,
3944 .o_cancel_unused = osc_cancel_unused,
3945 .o_join_lru = osc_join_lru,
3946 .o_iocontrol = osc_iocontrol,
3947 .o_get_info = osc_get_info,
3948 .o_set_info_async = osc_set_info_async,
3949 .o_import_event = osc_import_event,
3950 .o_llog_init = osc_llog_init,
3951 .o_llog_finish = osc_llog_finish,
3952 .o_process_config = osc_process_config,
3954 int __init osc_init(void)
3956 struct lprocfs_static_vars lvars = { 0 };
3960 lprocfs_osc_init_vars(&lvars);
3962 request_module("lquota");
3963 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3964 lquota_init(quota_interface);
3965 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3967 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3968 LUSTRE_OSC_NAME, NULL);
3970 if (quota_interface)
3971 PORTAL_SYMBOL_PUT(osc_quota_interface);
3979 static void /*__exit*/ osc_exit(void)
3981 lquota_exit(quota_interface);
3982 if (quota_interface)
3983 PORTAL_SYMBOL_PUT(osc_quota_interface);
3985 class_unregister_type(LUSTRE_OSC_NAME);
3988 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3989 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3990 MODULE_LICENSE("GPL");
3992 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);