1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
37 #include <libcfs/libcfs.h>
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <obd_cksum.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include <lustre_cache.h>
60 #include "osc_internal.h"
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT(lsm->lsm_object_gr);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
110 if (lmm_bytes < sizeof (*lmm)) {
111 CERROR("lov_mds_md too small: %d, need %d\n",
112 lmm_bytes, (int)sizeof(*lmm));
115 /* XXX LOV_MAGIC etc check? */
117 if (lmm->lmm_object_id == 0) {
118 CERROR("lov_mds_md: zero lmm_object_id\n");
123 lsm_size = lov_stripe_md_size(1);
127 if (*lsmp != NULL && lmm == NULL) {
128 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129 OBD_FREE(*lsmp, lsm_size);
135 OBD_ALLOC(*lsmp, lsm_size);
138 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140 OBD_FREE(*lsmp, lsm_size);
143 loi_init((*lsmp)->lsm_oinfo[0]);
147 /* XXX zero *lsmp? */
148 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150 LASSERT((*lsmp)->lsm_object_id);
151 LASSERT((*lsmp)->lsm_object_gr);
154 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159 static inline void osc_pack_capa(struct ptlrpc_request *req,
160 struct ost_body *body, void *capa)
162 struct obd_capa *oc = (struct obd_capa *)capa;
163 struct lustre_capa *c;
168 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
171 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172 DEBUG_CAPA(D_SEC, c, "pack");
175 static inline void osc_pack_req_body(struct ptlrpc_request *req,
176 struct obd_info *oinfo)
178 struct ost_body *body;
180 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
183 body->oa = *oinfo->oi_oa;
184 osc_pack_capa(req, body, oinfo->oi_capa);
187 static inline void osc_set_capa_size(struct ptlrpc_request *req,
188 const struct req_msg_field *field,
192 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
194 /* it is already calculated as sizeof struct obd_capa */
198 static int osc_getattr_interpret(struct ptlrpc_request *req,
199 struct osc_async_args *aa, int rc)
201 struct ost_body *body;
207 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
208 lustre_swab_ost_body);
210 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
211 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
213 /* This should really be sent by the OST */
214 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
215 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
217 CDEBUG(D_INFO, "can't unpack ost_body\n");
219 aa->aa_oi->oi_oa->o_valid = 0;
222 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
226 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227 struct ptlrpc_request_set *set)
229 struct ptlrpc_request *req;
230 struct osc_async_args *aa;
234 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
238 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
239 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
241 ptlrpc_request_free(req);
245 osc_pack_req_body(req, oinfo);
247 ptlrpc_request_set_replen(req);
248 req->rq_interpret_reply = osc_getattr_interpret;
250 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
251 aa = (struct osc_async_args *)&req->rq_async_args;
254 ptlrpc_set_add_req(set, req);
258 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
260 struct ptlrpc_request *req;
261 struct ost_body *body;
265 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
269 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
270 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
272 ptlrpc_request_free(req);
276 osc_pack_req_body(req, oinfo);
278 ptlrpc_request_set_replen(req);
280 rc = ptlrpc_queue_wait(req);
284 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
286 GOTO(out, rc = -EPROTO);
288 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
289 *oinfo->oi_oa = body->oa;
291 /* This should really be sent by the OST */
292 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
293 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
297 ptlrpc_req_finished(req);
301 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
302 struct obd_trans_info *oti)
304 struct ptlrpc_request *req;
305 struct ost_body *body;
309 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
310 oinfo->oi_oa->o_gr > 0);
312 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
317 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
319 ptlrpc_request_free(req);
323 osc_pack_req_body(req, oinfo);
325 ptlrpc_request_set_replen(req);
328 rc = ptlrpc_queue_wait(req);
332 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
334 GOTO(out, rc = -EPROTO);
336 *oinfo->oi_oa = body->oa;
340 ptlrpc_req_finished(req);
344 static int osc_setattr_interpret(struct ptlrpc_request *req,
345 struct osc_async_args *aa, int rc)
347 struct ost_body *body;
353 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
355 GOTO(out, rc = -EPROTO);
357 *aa->aa_oi->oi_oa = body->oa;
359 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
363 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
364 struct obd_trans_info *oti,
365 struct ptlrpc_request_set *rqset)
367 struct ptlrpc_request *req;
368 struct osc_async_args *aa;
372 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
376 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
377 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
379 ptlrpc_request_free(req);
383 osc_pack_req_body(req, oinfo);
385 ptlrpc_request_set_replen(req);
387 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
389 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
392 /* do mds to ost setattr asynchronouly */
394 /* Do not wait for response. */
395 ptlrpcd_add_req(req);
397 req->rq_interpret_reply = osc_setattr_interpret;
399 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
400 aa = (struct osc_async_args *)&req->rq_async_args;
403 ptlrpc_set_add_req(rqset, req);
409 int osc_real_create(struct obd_export *exp, struct obdo *oa,
410 struct lov_stripe_md **ea, struct obd_trans_info *oti)
412 struct ptlrpc_request *req;
413 struct ost_body *body;
414 struct lov_stripe_md *lsm;
423 rc = obd_alloc_memmd(exp, &lsm);
428 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
430 GOTO(out, rc = -ENOMEM);
432 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
434 ptlrpc_request_free(req);
438 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
442 ptlrpc_request_set_replen(req);
444 if (oa->o_valid & OBD_MD_FLINLINE) {
445 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
446 oa->o_flags == OBD_FL_DELORPHAN);
448 "delorphan from OST integration");
449 /* Don't resend the delorphan req */
450 req->rq_no_resend = req->rq_no_delay = 1;
453 rc = ptlrpc_queue_wait(req);
457 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
459 GOTO(out_req, rc = -EPROTO);
463 /* This should really be sent by the OST */
464 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
465 oa->o_valid |= OBD_MD_FLBLKSZ;
467 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
468 * have valid lsm_oinfo data structs, so don't go touching that.
469 * This needs to be fixed in a big way.
471 lsm->lsm_object_id = oa->o_id;
472 lsm->lsm_object_gr = oa->o_gr;
476 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
478 if (oa->o_valid & OBD_MD_FLCOOKIE) {
479 if (!oti->oti_logcookies)
480 oti_alloc_cookies(oti, 1);
481 *oti->oti_logcookies = *obdo_logcookie(oa);
485 CDEBUG(D_HA, "transno: "LPD64"\n",
486 lustre_msg_get_transno(req->rq_repmsg));
488 ptlrpc_req_finished(req);
491 obd_free_memmd(exp, &lsm);
495 static int osc_punch_interpret(struct ptlrpc_request *req,
496 struct osc_async_args *aa, int rc)
498 struct ost_body *body;
504 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
506 GOTO(out, rc = -EPROTO);
508 *aa->aa_oi->oi_oa = body->oa;
510 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
514 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
515 struct obd_trans_info *oti,
516 struct ptlrpc_request_set *rqset)
518 struct ptlrpc_request *req;
519 struct osc_async_args *aa;
520 struct ost_body *body;
525 CDEBUG(D_INFO, "oa NULL\n");
529 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536 ptlrpc_request_free(req);
539 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540 osc_pack_req_body(req, oinfo);
542 /* overload the size and blocks fields in the oa with start/end */
543 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545 body->oa.o_size = oinfo->oi_policy.l_extent.start;
546 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
547 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
548 ptlrpc_request_set_replen(req);
551 req->rq_interpret_reply = osc_punch_interpret;
552 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
553 aa = (struct osc_async_args *)&req->rq_async_args;
555 ptlrpc_set_add_req(rqset, req);
560 static int osc_sync(struct obd_export *exp, struct obdo *oa,
561 struct lov_stripe_md *md, obd_size start, obd_size end,
564 struct ptlrpc_request *req;
565 struct ost_body *body;
570 CDEBUG(D_INFO, "oa NULL\n");
574 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
578 osc_set_capa_size(req, &RMF_CAPA1, capa);
579 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
581 ptlrpc_request_free(req);
585 /* overload the size and blocks fields in the oa with start/end */
586 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
589 body->oa.o_size = start;
590 body->oa.o_blocks = end;
591 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
592 osc_pack_capa(req, body, capa);
594 ptlrpc_request_set_replen(req);
596 rc = ptlrpc_queue_wait(req);
600 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
602 GOTO(out, rc = -EPROTO);
608 ptlrpc_req_finished(req);
612 /* Find and cancel locally locks matched by @mode in the resource found by
613 * @objid. Found locks are added into @cancel list. Returns the amount of
614 * locks added to @cancels list. */
615 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
616 struct list_head *cancels, ldlm_mode_t mode,
619 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
620 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
621 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
628 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
629 lock_flags, 0, NULL);
630 ldlm_resource_putref(res);
634 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
637 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
639 atomic_dec(&cli->cl_destroy_in_flight);
640 cfs_waitq_signal(&cli->cl_destroy_waitq);
644 static int osc_can_send_destroy(struct client_obd *cli)
646 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
647 cli->cl_max_rpcs_in_flight) {
648 /* The destroy request can be sent */
651 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
652 cli->cl_max_rpcs_in_flight) {
654 * The counter has been modified between the two atomic
657 cfs_waitq_signal(&cli->cl_destroy_waitq);
662 /* Destroy requests can be async always on the client, and we don't even really
663 * care about the return code since the client cannot do anything at all about
665 * When the MDS is unlinking a filename, it saves the file objects into a
666 * recovery llog, and these object records are cancelled when the OST reports
667 * they were destroyed and sync'd to disk (i.e. transaction committed).
668 * If the client dies, or the OST is down when the object should be destroyed,
669 * the records are not cancelled, and when the OST reconnects to the MDS next,
670 * it will retrieve the llog unlink logs and then sends the log cancellation
671 * cookies to the MDS after committing destroy transactions. */
672 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
673 struct lov_stripe_md *ea, struct obd_trans_info *oti,
674 struct obd_export *md_export)
676 struct client_obd *cli = &exp->exp_obd->u.cli;
677 struct ptlrpc_request *req;
678 struct ost_body *body;
679 CFS_LIST_HEAD(cancels);
684 CDEBUG(D_INFO, "oa NULL\n");
688 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
689 LDLM_FL_DISCARD_DATA);
691 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
693 ldlm_lock_list_put(&cancels, l_bl_ast, count);
697 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
700 ptlrpc_request_free(req);
704 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
705 req->rq_interpret_reply = osc_destroy_interpret;
707 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
708 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
709 sizeof(*oti->oti_logcookies));
710 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
714 ptlrpc_request_set_replen(req);
716 if (!osc_can_send_destroy(cli)) {
717 struct l_wait_info lwi = { 0 };
720 * Wait until the number of on-going destroy RPCs drops
721 * under max_rpc_in_flight
723 l_wait_event_exclusive(cli->cl_destroy_waitq,
724 osc_can_send_destroy(cli), &lwi);
727 /* Do not wait for response */
728 ptlrpcd_add_req(req);
732 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
735 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
737 LASSERT(!(oa->o_valid & bits));
740 client_obd_list_lock(&cli->cl_loi_list_lock);
741 oa->o_dirty = cli->cl_dirty;
742 if (cli->cl_dirty > cli->cl_dirty_max) {
743 CERROR("dirty %lu > dirty_max %lu\n",
744 cli->cl_dirty, cli->cl_dirty_max);
746 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
747 CERROR("dirty %d > system dirty_max %d\n",
748 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
750 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
751 CERROR("dirty %lu - dirty_max %lu too big???\n",
752 cli->cl_dirty, cli->cl_dirty_max);
755 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
756 (cli->cl_max_rpcs_in_flight + 1);
757 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
759 oa->o_grant = cli->cl_avail_grant;
760 oa->o_dropped = cli->cl_lost_grant;
761 cli->cl_lost_grant = 0;
762 client_obd_list_unlock(&cli->cl_loi_list_lock);
763 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
764 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
767 /* caller must hold loi_list_lock */
768 static void osc_consume_write_grant(struct client_obd *cli,
769 struct brw_page *pga)
771 atomic_inc(&obd_dirty_pages);
772 cli->cl_dirty += CFS_PAGE_SIZE;
773 cli->cl_avail_grant -= CFS_PAGE_SIZE;
774 pga->flag |= OBD_BRW_FROM_GRANT;
775 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
776 CFS_PAGE_SIZE, pga, pga->pg);
777 LASSERT(cli->cl_avail_grant >= 0);
780 /* the companion to osc_consume_write_grant, called when a brw has completed.
781 * must be called with the loi lock held. */
782 static void osc_release_write_grant(struct client_obd *cli,
783 struct brw_page *pga, int sent)
785 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
788 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
793 pga->flag &= ~OBD_BRW_FROM_GRANT;
794 atomic_dec(&obd_dirty_pages);
795 cli->cl_dirty -= CFS_PAGE_SIZE;
797 cli->cl_lost_grant += CFS_PAGE_SIZE;
798 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
799 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
800 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
801 /* For short writes we shouldn't count parts of pages that
802 * span a whole block on the OST side, or our accounting goes
803 * wrong. Should match the code in filter_grant_check. */
804 int offset = pga->off & ~CFS_PAGE_MASK;
805 int count = pga->count + (offset & (blocksize - 1));
806 int end = (offset + pga->count) & (blocksize - 1);
808 count += blocksize - end;
810 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
811 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
812 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
813 cli->cl_avail_grant, cli->cl_dirty);
819 static unsigned long rpcs_in_flight(struct client_obd *cli)
821 return cli->cl_r_in_flight + cli->cl_w_in_flight;
824 /* caller must hold loi_list_lock */
825 void osc_wake_cache_waiters(struct client_obd *cli)
827 struct list_head *l, *tmp;
828 struct osc_cache_waiter *ocw;
831 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
832 /* if we can't dirty more, we must wait until some is written */
833 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
834 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
835 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
836 "osc max %ld, sys max %d\n", cli->cl_dirty,
837 cli->cl_dirty_max, obd_max_dirty_pages);
841 /* if still dirty cache but no grant wait for pending RPCs that
842 * may yet return us some grant before doing sync writes */
843 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
844 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
845 cli->cl_w_in_flight);
849 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
850 list_del_init(&ocw->ocw_entry);
851 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
852 /* no more RPCs in flight to return grant, do sync IO */
853 ocw->ocw_rc = -EDQUOT;
854 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
856 osc_consume_write_grant(cli,
857 &ocw->ocw_oap->oap_brw_page);
860 cfs_waitq_signal(&ocw->ocw_waitq);
866 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
868 client_obd_list_lock(&cli->cl_loi_list_lock);
869 cli->cl_avail_grant = ocd->ocd_grant;
870 client_obd_list_unlock(&cli->cl_loi_list_lock);
872 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
873 cli->cl_avail_grant, cli->cl_lost_grant);
874 LASSERT(cli->cl_avail_grant >= 0);
877 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
879 client_obd_list_lock(&cli->cl_loi_list_lock);
880 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
881 if (body->oa.o_valid & OBD_MD_FLGRANT)
882 cli->cl_avail_grant += body->oa.o_grant;
883 /* waiters are woken in brw_interpret */
884 client_obd_list_unlock(&cli->cl_loi_list_lock);
887 /* We assume that the reason this OSC got a short read is because it read
888 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
889 * via the LOV, and it _knows_ it's reading inside the file, it's just that
890 * this stripe never got written at or beyond this stripe offset yet. */
891 static void handle_short_read(int nob_read, obd_count page_count,
892 struct brw_page **pga)
897 /* skip bytes read OK */
898 while (nob_read > 0) {
899 LASSERT (page_count > 0);
901 if (pga[i]->count > nob_read) {
902 /* EOF inside this page */
903 ptr = cfs_kmap(pga[i]->pg) +
904 (pga[i]->off & ~CFS_PAGE_MASK);
905 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
906 cfs_kunmap(pga[i]->pg);
912 nob_read -= pga[i]->count;
917 /* zero remaining pages */
918 while (page_count-- > 0) {
919 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
920 memset(ptr, 0, pga[i]->count);
921 cfs_kunmap(pga[i]->pg);
926 static int check_write_rcs(struct ptlrpc_request *req,
927 int requested_nob, int niocount,
928 obd_count page_count, struct brw_page **pga)
932 /* return error if any niobuf was in error */
933 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
934 sizeof(*remote_rcs) * niocount, NULL);
935 if (remote_rcs == NULL) {
936 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
939 if (lustre_msg_swabbed(req->rq_repmsg))
940 for (i = 0; i < niocount; i++)
941 __swab32s(&remote_rcs[i]);
943 for (i = 0; i < niocount; i++) {
944 if (remote_rcs[i] < 0)
945 return(remote_rcs[i]);
947 if (remote_rcs[i] != 0) {
948 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
949 i, remote_rcs[i], req);
954 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
955 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
956 requested_nob, req->rq_bulk->bd_nob_transferred);
963 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
965 if (p1->flag != p2->flag) {
966 unsigned mask = ~OBD_BRW_FROM_GRANT;
968 /* warn if we try to combine flags that we don't know to be
970 if ((p1->flag & mask) != (p2->flag & mask))
971 CERROR("is it ok to have flags 0x%x and 0x%x in the "
972 "same brw?\n", p1->flag, p2->flag);
976 return (p1->off + p1->count == p2->off);
979 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
980 struct brw_page **pga, int opc,
981 cksum_type_t cksum_type)
986 LASSERT (pg_count > 0);
987 cksum = init_checksum(cksum_type);
988 while (nob > 0 && pg_count > 0) {
989 unsigned char *ptr = cfs_kmap(pga[i]->pg);
990 int off = pga[i]->off & ~CFS_PAGE_MASK;
991 int count = pga[i]->count > nob ? nob : pga[i]->count;
993 /* corrupt the data before we compute the checksum, to
994 * simulate an OST->client data error */
995 if (i == 0 && opc == OST_READ &&
996 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
997 memcpy(ptr + off, "bad1", min(4, nob));
998 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
999 cfs_kunmap(pga[i]->pg);
1000 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1003 nob -= pga[i]->count;
1007 /* For sending we only compute the wrong checksum instead
1008 * of corrupting the data so it is still correct on a redo */
1009 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1015 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1016 struct lov_stripe_md *lsm, obd_count page_count,
1017 struct brw_page **pga,
1018 struct ptlrpc_request **reqp,
1019 struct obd_capa *ocapa)
1021 struct ptlrpc_request *req;
1022 struct ptlrpc_bulk_desc *desc;
1023 struct ost_body *body;
1024 struct obd_ioobj *ioobj;
1025 struct niobuf_remote *niobuf;
1026 int niocount, i, requested_nob, opc, rc;
1027 struct osc_brw_async_args *aa;
1028 struct req_capsule *pill;
1031 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1032 RETURN(-ENOMEM); /* Recoverable */
1033 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1034 RETURN(-EINVAL); /* Fatal */
1036 if ((cmd & OBD_BRW_WRITE) != 0) {
1038 req = ptlrpc_request_alloc_pool(cli->cl_import,
1039 cli->cl_import->imp_rq_pool,
1043 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1049 for (niocount = i = 1; i < page_count; i++) {
1050 if (!can_merge_pages(pga[i - 1], pga[i]))
1054 pill = &req->rq_pill;
1055 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1056 niocount * sizeof(*niobuf));
1057 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1059 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1061 ptlrpc_request_free(req);
1064 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1066 if (opc == OST_WRITE)
1067 desc = ptlrpc_prep_bulk_imp(req, page_count,
1068 BULK_GET_SOURCE, OST_BULK_PORTAL);
1070 desc = ptlrpc_prep_bulk_imp(req, page_count,
1071 BULK_PUT_SINK, OST_BULK_PORTAL);
1074 GOTO(out, rc = -ENOMEM);
1075 /* NB request now owns desc and will free it when it gets freed */
1077 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1078 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1079 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1080 LASSERT(body && ioobj && niobuf);
1084 obdo_to_ioobj(oa, ioobj);
1085 ioobj->ioo_bufcnt = niocount;
1086 osc_pack_capa(req, body, ocapa);
1087 LASSERT (page_count > 0);
1088 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1089 struct brw_page *pg = pga[i];
1090 struct brw_page *pg_prev = pga[i - 1];
1092 LASSERT(pg->count > 0);
1093 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1094 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1095 pg->off, pg->count);
1097 LASSERTF(i == 0 || pg->off > pg_prev->off,
1098 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1099 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1101 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1102 pg_prev->pg, page_private(pg_prev->pg),
1103 pg_prev->pg->index, pg_prev->off);
1105 LASSERTF(i == 0 || pg->off > pg_prev->off,
1106 "i %d p_c %u\n", i, page_count);
1108 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1109 (pg->flag & OBD_BRW_SRVLOCK));
1111 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1113 requested_nob += pg->count;
1115 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1117 niobuf->len += pg->count;
1119 niobuf->offset = pg->off;
1120 niobuf->len = pg->count;
1121 niobuf->flags = pg->flag;
1125 LASSERT((void *)(niobuf - niocount) ==
1126 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1127 niocount * sizeof(*niobuf)));
1128 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1130 /* size[REQ_REC_OFF] still sizeof (*body) */
1131 if (opc == OST_WRITE) {
1132 if (unlikely(cli->cl_checksum) &&
1133 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1134 /* store cl_cksum_type in a local variable since
1135 * it can be changed via lprocfs */
1136 cksum_type_t cksum_type = cli->cl_cksum_type;
1138 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1139 oa->o_flags = body->oa.o_flags = 0;
1140 body->oa.o_flags |= cksum_type_pack(cksum_type);
1141 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1142 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1146 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1148 /* save this in 'oa', too, for later checking */
1149 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1150 oa->o_flags |= cksum_type_pack(cksum_type);
1152 /* clear out the checksum flag, in case this is a
1153 * resend but cl_checksum is no longer set. b=11238 */
1154 oa->o_valid &= ~OBD_MD_FLCKSUM;
1156 oa->o_cksum = body->oa.o_cksum;
1157 /* 1 RC per niobuf */
1158 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1159 sizeof(__u32) * niocount);
1161 if (unlikely(cli->cl_checksum) &&
1162 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1163 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1164 body->oa.o_flags = 0;
1165 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1166 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1168 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1169 /* 1 RC for the whole I/O */
1171 ptlrpc_request_set_replen(req);
1173 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1174 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1176 aa->aa_requested_nob = requested_nob;
1177 aa->aa_nio_count = niocount;
1178 aa->aa_page_count = page_count;
1182 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1188 ptlrpc_req_finished(req);
1192 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1193 __u32 client_cksum, __u32 server_cksum, int nob,
1194 obd_count page_count, struct brw_page **pga,
1195 cksum_type_t client_cksum_type)
1199 cksum_type_t cksum_type;
1201 if (server_cksum == client_cksum) {
1202 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1206 if (oa->o_valid & OBD_MD_FLFLAGS)
1207 cksum_type = cksum_type_unpack(oa->o_flags);
1209 cksum_type = OBD_CKSUM_CRC32;
1211 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1214 if (cksum_type != client_cksum_type)
1215 msg = "the server did not use the checksum type specified in "
1216 "the original request - likely a protocol problem";
1217 else if (new_cksum == server_cksum)
1218 msg = "changed on the client after we checksummed it - "
1219 "likely false positive due to mmap IO (bug 11742)";
1220 else if (new_cksum == client_cksum)
1221 msg = "changed in transit before arrival at OST";
1223 msg = "changed in transit AND doesn't match the original - "
1224 "likely false positive due to mmap IO (bug 11742)";
1226 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1227 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1228 "["LPU64"-"LPU64"]\n",
1229 msg, libcfs_nid2str(peer->nid),
1230 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1231 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1234 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1236 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1237 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1238 "client csum now %x\n", client_cksum, client_cksum_type,
1239 server_cksum, cksum_type, new_cksum);
1243 /* Note rc enters this function as number of bytes transferred */
1244 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1246 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1247 const lnet_process_id_t *peer =
1248 &req->rq_import->imp_connection->c_peer;
1249 struct client_obd *cli = aa->aa_cli;
1250 struct ost_body *body;
1251 __u32 client_cksum = 0;
1254 if (rc < 0 && rc != -EDQUOT)
1257 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1258 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1259 lustre_swab_ost_body);
1261 CDEBUG(D_INFO, "Can't unpack body\n");
1265 /* set/clear over quota flag for a uid/gid */
1266 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1267 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1268 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1269 body->oa.o_gid, body->oa.o_valid,
1275 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1276 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1278 osc_update_grant(cli, body);
1280 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1282 CERROR("Unexpected +ve rc %d\n", rc);
1285 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1287 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1288 check_write_checksum(&body->oa, peer, client_cksum,
1289 body->oa.o_cksum, aa->aa_requested_nob,
1290 aa->aa_page_count, aa->aa_ppga,
1291 cksum_type_unpack(aa->aa_oa->o_flags)))
1294 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1297 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1298 aa->aa_page_count, aa->aa_ppga);
1302 /* The rest of this function executes only for OST_READs */
1303 if (rc > aa->aa_requested_nob) {
1304 CERROR("Unexpected rc %d (%d requested)\n", rc,
1305 aa->aa_requested_nob);
1309 if (rc != req->rq_bulk->bd_nob_transferred) {
1310 CERROR ("Unexpected rc %d (%d transferred)\n",
1311 rc, req->rq_bulk->bd_nob_transferred);
1315 if (rc < aa->aa_requested_nob)
1316 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1318 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1320 GOTO(out, rc = -EAGAIN);
1322 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1323 static int cksum_counter;
1324 __u32 server_cksum = body->oa.o_cksum;
1327 cksum_type_t cksum_type;
1329 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1330 cksum_type = cksum_type_unpack(body->oa.o_flags);
1332 cksum_type = OBD_CKSUM_CRC32;
1333 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1334 aa->aa_ppga, OST_READ,
1337 if (peer->nid == req->rq_bulk->bd_sender) {
1341 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1344 if (server_cksum == ~0 && rc > 0) {
1345 CERROR("Protocol error: server %s set the 'checksum' "
1346 "bit, but didn't send a checksum. Not fatal, "
1347 "but please tell CFS.\n",
1348 libcfs_nid2str(peer->nid));
1349 } else if (server_cksum != client_cksum) {
1350 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1351 "%s%s%s inum "LPU64"/"LPU64" object "
1352 LPU64"/"LPU64" extent "
1353 "["LPU64"-"LPU64"]\n",
1354 req->rq_import->imp_obd->obd_name,
1355 libcfs_nid2str(peer->nid),
1357 body->oa.o_valid & OBD_MD_FLFID ?
1358 body->oa.o_fid : (__u64)0,
1359 body->oa.o_valid & OBD_MD_FLFID ?
1360 body->oa.o_generation :(__u64)0,
1362 body->oa.o_valid & OBD_MD_FLGROUP ?
1363 body->oa.o_gr : (__u64)0,
1364 aa->aa_ppga[0]->off,
1365 aa->aa_ppga[aa->aa_page_count-1]->off +
1366 aa->aa_ppga[aa->aa_page_count-1]->count -
1368 CERROR("client %x, server %x, cksum_type %x\n",
1369 client_cksum, server_cksum, cksum_type);
1371 aa->aa_oa->o_cksum = client_cksum;
1375 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1378 } else if (unlikely(client_cksum)) {
1379 static int cksum_missed;
1382 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1383 CERROR("Checksum %u requested from %s but not sent\n",
1384 cksum_missed, libcfs_nid2str(peer->nid));
1390 *aa->aa_oa = body->oa;
1395 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1396 struct lov_stripe_md *lsm,
1397 obd_count page_count, struct brw_page **pga,
1398 struct obd_capa *ocapa)
1400 struct ptlrpc_request *req;
1404 struct l_wait_info lwi;
1408 cfs_waitq_init(&waitq);
1411 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1412 page_count, pga, &req, ocapa);
1416 rc = ptlrpc_queue_wait(req);
1418 if (rc == -ETIMEDOUT && req->rq_resend) {
1419 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1420 ptlrpc_req_finished(req);
1424 rc = osc_brw_fini_request(req, rc);
1426 ptlrpc_req_finished(req);
1427 if (osc_recoverable_error(rc)) {
1429 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1430 CERROR("too many resend retries, returning error\n");
1434 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1435 l_wait_event(waitq, 0, &lwi);
1443 int osc_brw_redo_request(struct ptlrpc_request *request,
1444 struct osc_brw_async_args *aa)
1446 struct ptlrpc_request *new_req;
1447 struct ptlrpc_request_set *set = request->rq_set;
1448 struct osc_brw_async_args *new_aa;
1449 struct osc_async_page *oap;
1453 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1454 CERROR("too many resend retries, returning error\n");
1458 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1460 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1461 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1462 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1465 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1466 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1467 aa->aa_cli, aa->aa_oa,
1468 NULL /* lsm unused by osc currently */,
1469 aa->aa_page_count, aa->aa_ppga,
1470 &new_req, NULL /* ocapa */);
1474 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1476 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1477 if (oap->oap_request != NULL) {
1478 LASSERTF(request == oap->oap_request,
1479 "request %p != oap_request %p\n",
1480 request, oap->oap_request);
1481 if (oap->oap_interrupted) {
1482 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1483 ptlrpc_req_finished(new_req);
1488 /* New request takes over pga and oaps from old request.
1489 * Note that copying a list_head doesn't work, need to move it... */
1491 new_req->rq_interpret_reply = request->rq_interpret_reply;
1492 new_req->rq_async_args = request->rq_async_args;
1493 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1495 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1497 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1498 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1499 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1501 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1502 if (oap->oap_request) {
1503 ptlrpc_req_finished(oap->oap_request);
1504 oap->oap_request = ptlrpc_request_addref(new_req);
1508 /* use ptlrpc_set_add_req is safe because interpret functions work
1509 * in check_set context. only one way exist with access to request
1510 * from different thread got -EINTR - this way protected with
1511 * cl_loi_list_lock */
1512 ptlrpc_set_add_req(set, new_req);
1514 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1516 DEBUG_REQ(D_INFO, new_req, "new request");
1520 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1521 struct lov_stripe_md *lsm, obd_count page_count,
1522 struct brw_page **pga, struct ptlrpc_request_set *set,
1523 struct obd_capa *ocapa)
1525 struct ptlrpc_request *req;
1526 struct client_obd *cli = &exp->exp_obd->u.cli;
1528 struct osc_brw_async_args *aa;
1531 /* Consume write credits even if doing a sync write -
1532 * otherwise we may run out of space on OST due to grant. */
1533 if (cmd == OBD_BRW_WRITE) {
1534 spin_lock(&cli->cl_loi_list_lock);
1535 for (i = 0; i < page_count; i++) {
1536 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1537 osc_consume_write_grant(cli, pga[i]);
1539 spin_unlock(&cli->cl_loi_list_lock);
1542 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1545 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1546 if (cmd == OBD_BRW_READ) {
1547 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1548 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1549 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1551 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1552 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1553 cli->cl_w_in_flight);
1554 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1557 LASSERT(list_empty(&aa->aa_oaps));
1559 req->rq_interpret_reply = brw_interpret;
1560 ptlrpc_set_add_req(set, req);
1561 client_obd_list_lock(&cli->cl_loi_list_lock);
1562 if (cmd == OBD_BRW_READ)
1563 cli->cl_r_in_flight++;
1565 cli->cl_w_in_flight++;
1566 client_obd_list_unlock(&cli->cl_loi_list_lock);
1567 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1568 } else if (cmd == OBD_BRW_WRITE) {
1569 client_obd_list_lock(&cli->cl_loi_list_lock);
1570 for (i = 0; i < page_count; i++)
1571 osc_release_write_grant(cli, pga[i], 0);
1572 osc_wake_cache_waiters(cli);
1573 client_obd_list_unlock(&cli->cl_loi_list_lock);
1579 * ugh, we want disk allocation on the target to happen in offset order. we'll
1580 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1581 * fine for our small page arrays and doesn't require allocation. its an
1582 * insertion sort that swaps elements that are strides apart, shrinking the
1583 * stride down until its '1' and the array is sorted.
1585 static void sort_brw_pages(struct brw_page **array, int num)
1588 struct brw_page *tmp;
1592 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1597 for (i = stride ; i < num ; i++) {
1600 while (j >= stride && array[j - stride]->off > tmp->off) {
1601 array[j] = array[j - stride];
1606 } while (stride > 1);
1609 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1615 LASSERT (pages > 0);
1616 offset = pg[i]->off & ~CFS_PAGE_MASK;
1620 if (pages == 0) /* that's all */
1623 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1624 return count; /* doesn't end on page boundary */
1627 offset = pg[i]->off & ~CFS_PAGE_MASK;
1628 if (offset != 0) /* doesn't start on page boundary */
1635 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1637 struct brw_page **ppga;
1640 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1644 for (i = 0; i < count; i++)
1649 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1651 LASSERT(ppga != NULL);
1652 OBD_FREE(ppga, sizeof(*ppga) * count);
1655 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1656 obd_count page_count, struct brw_page *pga,
1657 struct obd_trans_info *oti)
1659 struct obdo *saved_oa = NULL;
1660 struct brw_page **ppga, **orig;
1661 struct obd_import *imp = class_exp2cliimp(exp);
1662 struct client_obd *cli = &imp->imp_obd->u.cli;
1663 int rc, page_count_orig;
1666 if (cmd & OBD_BRW_CHECK) {
1667 /* The caller just wants to know if there's a chance that this
1668 * I/O can succeed */
1670 if (imp == NULL || imp->imp_invalid)
1675 /* test_brw with a failed create can trip this, maybe others. */
1676 LASSERT(cli->cl_max_pages_per_rpc);
1680 orig = ppga = osc_build_ppga(pga, page_count);
1683 page_count_orig = page_count;
1685 sort_brw_pages(ppga, page_count);
1686 while (page_count) {
1687 obd_count pages_per_brw;
1689 if (page_count > cli->cl_max_pages_per_rpc)
1690 pages_per_brw = cli->cl_max_pages_per_rpc;
1692 pages_per_brw = page_count;
1694 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1696 if (saved_oa != NULL) {
1697 /* restore previously saved oa */
1698 *oinfo->oi_oa = *saved_oa;
1699 } else if (page_count > pages_per_brw) {
1700 /* save a copy of oa (brw will clobber it) */
1701 OBDO_ALLOC(saved_oa);
1702 if (saved_oa == NULL)
1703 GOTO(out, rc = -ENOMEM);
1704 *saved_oa = *oinfo->oi_oa;
1707 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1708 pages_per_brw, ppga, oinfo->oi_capa);
1713 page_count -= pages_per_brw;
1714 ppga += pages_per_brw;
1718 osc_release_ppga(orig, page_count_orig);
1720 if (saved_oa != NULL)
1721 OBDO_FREE(saved_oa);
1726 static int osc_brw_async(int cmd, struct obd_export *exp,
1727 struct obd_info *oinfo, obd_count page_count,
1728 struct brw_page *pga, struct obd_trans_info *oti,
1729 struct ptlrpc_request_set *set)
1731 struct brw_page **ppga, **orig;
1732 struct client_obd *cli = &exp->exp_obd->u.cli;
1733 int page_count_orig;
1737 if (cmd & OBD_BRW_CHECK) {
1738 struct obd_import *imp = class_exp2cliimp(exp);
1739 /* The caller just wants to know if there's a chance that this
1740 * I/O can succeed */
1742 if (imp == NULL || imp->imp_invalid)
1747 orig = ppga = osc_build_ppga(pga, page_count);
1750 page_count_orig = page_count;
1752 sort_brw_pages(ppga, page_count);
1753 while (page_count) {
1754 struct brw_page **copy;
1755 obd_count pages_per_brw;
1757 pages_per_brw = min_t(obd_count, page_count,
1758 cli->cl_max_pages_per_rpc);
1760 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1762 /* use ppga only if single RPC is going to fly */
1763 if (pages_per_brw != page_count_orig || ppga != orig) {
1764 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1766 GOTO(out, rc = -ENOMEM);
1767 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1771 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1772 pages_per_brw, copy, set, oinfo->oi_capa);
1776 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1780 /* we passed it to async_internal() which is
1781 * now responsible for releasing memory */
1785 page_count -= pages_per_brw;
1786 ppga += pages_per_brw;
1790 osc_release_ppga(orig, page_count_orig);
1794 static void osc_check_rpcs(struct client_obd *cli);
1796 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1797 * the dirty accounting. Writeback completes or truncate happens before
1798 * writing starts. Must be called with the loi lock held. */
1799 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1802 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1806 /* This maintains the lists of pending pages to read/write for a given object
1807 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1808 * to quickly find objects that are ready to send an RPC. */
1809 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1815 if (lop->lop_num_pending == 0)
1818 /* if we have an invalid import we want to drain the queued pages
1819 * by forcing them through rpcs that immediately fail and complete
1820 * the pages. recovery relies on this to empty the queued pages
1821 * before canceling the locks and evicting down the llite pages */
1822 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1825 /* stream rpcs in queue order as long as as there is an urgent page
1826 * queued. this is our cheap solution for good batching in the case
1827 * where writepage marks some random page in the middle of the file
1828 * as urgent because of, say, memory pressure */
1829 if (!list_empty(&lop->lop_urgent)) {
1830 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1833 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1834 optimal = cli->cl_max_pages_per_rpc;
1835 if (cmd & OBD_BRW_WRITE) {
1836 /* trigger a write rpc stream as long as there are dirtiers
1837 * waiting for space. as they're waiting, they're not going to
1838 * create more pages to coallesce with what's waiting.. */
1839 if (!list_empty(&cli->cl_cache_waiters)) {
1840 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1843 /* +16 to avoid triggering rpcs that would want to include pages
1844 * that are being queued but which can't be made ready until
1845 * the queuer finishes with the page. this is a wart for
1846 * llite::commit_write() */
1849 if (lop->lop_num_pending >= optimal)
1855 static void on_list(struct list_head *item, struct list_head *list,
1858 if (list_empty(item) && should_be_on)
1859 list_add_tail(item, list);
1860 else if (!list_empty(item) && !should_be_on)
1861 list_del_init(item);
1864 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1865 * can find pages to build into rpcs quickly */
1866 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1868 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1869 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1870 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1872 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1873 loi->loi_write_lop.lop_num_pending);
1875 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1876 loi->loi_read_lop.lop_num_pending);
1879 static void lop_update_pending(struct client_obd *cli,
1880 struct loi_oap_pages *lop, int cmd, int delta)
1882 lop->lop_num_pending += delta;
1883 if (cmd & OBD_BRW_WRITE)
1884 cli->cl_pending_w_pages += delta;
1886 cli->cl_pending_r_pages += delta;
1889 /* this is called when a sync waiter receives an interruption. Its job is to
1890 * get the caller woken as soon as possible. If its page hasn't been put in an
1891 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1892 * desiring interruption which will forcefully complete the rpc once the rpc
1894 static void osc_occ_interrupted(struct oig_callback_context *occ)
1896 struct osc_async_page *oap;
1897 struct loi_oap_pages *lop;
1898 struct lov_oinfo *loi;
1901 /* XXX member_of() */
1902 oap = list_entry(occ, struct osc_async_page, oap_occ);
1904 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1906 oap->oap_interrupted = 1;
1908 /* ok, it's been put in an rpc. only one oap gets a request reference */
1909 if (oap->oap_request != NULL) {
1910 ptlrpc_mark_interrupted(oap->oap_request);
1911 ptlrpcd_wake(oap->oap_request);
1915 /* we don't get interruption callbacks until osc_trigger_group_io()
1916 * has been called and put the sync oaps in the pending/urgent lists.*/
1917 if (!list_empty(&oap->oap_pending_item)) {
1918 list_del_init(&oap->oap_pending_item);
1919 list_del_init(&oap->oap_urgent_item);
1922 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1923 &loi->loi_write_lop : &loi->loi_read_lop;
1924 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1925 loi_list_maint(oap->oap_cli, oap->oap_loi);
1927 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1928 oap->oap_oig = NULL;
1932 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1935 /* this is trying to propogate async writeback errors back up to the
1936 * application. As an async write fails we record the error code for later if
1937 * the app does an fsync. As long as errors persist we force future rpcs to be
1938 * sync so that the app can get a sync error and break the cycle of queueing
1939 * pages for which writeback will fail. */
1940 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1947 ar->ar_force_sync = 1;
1948 ar->ar_min_xid = ptlrpc_sample_next_xid();
1953 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1954 ar->ar_force_sync = 0;
1957 static void osc_oap_to_pending(struct osc_async_page *oap)
1959 struct loi_oap_pages *lop;
1961 if (oap->oap_cmd & OBD_BRW_WRITE)
1962 lop = &oap->oap_loi->loi_write_lop;
1964 lop = &oap->oap_loi->loi_read_lop;
1966 if (oap->oap_async_flags & ASYNC_URGENT)
1967 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1968 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1969 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1972 /* this must be called holding the loi list lock to give coverage to exit_cache,
1973 * async_flag maintenance, and oap_request */
1974 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1975 struct osc_async_page *oap, int sent, int rc)
1980 if (oap->oap_request != NULL) {
1981 xid = ptlrpc_req_xid(oap->oap_request);
1982 ptlrpc_req_finished(oap->oap_request);
1983 oap->oap_request = NULL;
1986 oap->oap_async_flags = 0;
1987 oap->oap_interrupted = 0;
1989 if (oap->oap_cmd & OBD_BRW_WRITE) {
1990 osc_process_ar(&cli->cl_ar, xid, rc);
1991 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1994 if (rc == 0 && oa != NULL) {
1995 if (oa->o_valid & OBD_MD_FLBLOCKS)
1996 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1997 if (oa->o_valid & OBD_MD_FLMTIME)
1998 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1999 if (oa->o_valid & OBD_MD_FLATIME)
2000 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2001 if (oa->o_valid & OBD_MD_FLCTIME)
2002 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2006 osc_exit_cache(cli, oap, sent);
2007 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2008 oap->oap_oig = NULL;
2013 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2014 oap->oap_cmd, oa, rc);
2016 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2017 * I/O on the page could start, but OSC calls it under lock
2018 * and thus we can add oap back to pending safely */
2020 /* upper layer wants to leave the page on pending queue */
2021 osc_oap_to_pending(oap);
2023 osc_exit_cache(cli, oap, sent);
2027 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2029 struct osc_brw_async_args *aa = data;
2030 struct client_obd *cli;
2033 rc = osc_brw_fini_request(req, rc);
2034 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2035 if (osc_recoverable_error(rc)) {
2036 rc = osc_brw_redo_request(req, aa);
2043 client_obd_list_lock(&cli->cl_loi_list_lock);
2045 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2046 * is called so we know whether to go to sync BRWs or wait for more
2047 * RPCs to complete */
2048 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2049 cli->cl_w_in_flight--;
2051 cli->cl_r_in_flight--;
2053 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2054 struct osc_async_page *oap, *tmp;
2055 /* the caller may re-use the oap after the completion call so
2056 * we need to clean it up a little */
2057 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2058 list_del_init(&oap->oap_rpc_item);
2059 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2061 OBDO_FREE(aa->aa_oa);
2062 } else { /* from async_internal() */
2064 for (i = 0; i < aa->aa_page_count; i++)
2065 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2067 osc_wake_cache_waiters(cli);
2068 osc_check_rpcs(cli);
2069 client_obd_list_unlock(&cli->cl_loi_list_lock);
2071 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2075 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2076 struct list_head *rpc_list,
2077 int page_count, int cmd)
2079 struct ptlrpc_request *req;
2080 struct brw_page **pga = NULL;
2081 struct osc_brw_async_args *aa;
2082 struct obdo *oa = NULL;
2083 struct obd_async_page_ops *ops = NULL;
2084 void *caller_data = NULL;
2085 struct obd_capa *ocapa;
2086 struct osc_async_page *oap;
2090 LASSERT(!list_empty(rpc_list));
2092 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2094 RETURN(ERR_PTR(-ENOMEM));
2098 GOTO(out, req = ERR_PTR(-ENOMEM));
2101 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2103 ops = oap->oap_caller_ops;
2104 caller_data = oap->oap_caller_data;
2106 pga[i] = &oap->oap_brw_page;
2107 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2108 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2109 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2113 /* always get the data for the obdo for the rpc */
2114 LASSERT(ops != NULL);
2115 ops->ap_fill_obdo(caller_data, cmd, oa);
2116 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2118 sort_brw_pages(pga, page_count);
2119 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2123 CERROR("prep_req failed: %d\n", rc);
2124 GOTO(out, req = ERR_PTR(rc));
2127 /* Need to update the timestamps after the request is built in case
2128 * we race with setattr (locally or in queue at OST). If OST gets
2129 * later setattr before earlier BRW (as determined by the request xid),
2130 * the OST will not use BRW timestamps. Sadly, there is no obvious
2131 * way to do this in a single call. bug 10150 */
2132 ops->ap_update_obdo(caller_data, cmd, oa,
2133 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2135 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2136 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2137 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2138 list_splice(rpc_list, &aa->aa_oaps);
2139 CFS_INIT_LIST_HEAD(rpc_list);
2146 OBD_FREE(pga, sizeof(*pga) * page_count);
2151 /* the loi lock is held across this function but it's allowed to release
2152 * and reacquire it during its work */
2154 * prepare pages for ASYNC io and put pages in send queue.
2158 * \param cmd - OBD_BRW_* macroses
2159 * \param lop - pending pages
2161 * \return zero if pages successfully add to send queue.
2162 * \return not zere if error occurring.
2164 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2165 int cmd, struct loi_oap_pages *lop)
2167 struct ptlrpc_request *req;
2168 obd_count page_count = 0;
2169 struct osc_async_page *oap = NULL, *tmp;
2170 struct osc_brw_async_args *aa;
2171 struct obd_async_page_ops *ops;
2172 CFS_LIST_HEAD(rpc_list);
2173 unsigned int ending_offset;
2174 unsigned starting_offset = 0;
2178 /* first we find the pages we're allowed to work with */
2179 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2181 ops = oap->oap_caller_ops;
2183 LASSERT(oap->oap_magic == OAP_MAGIC);
2185 if (page_count != 0 &&
2186 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2187 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2188 " oap %p, page %p, srvlock %u\n",
2189 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2192 /* in llite being 'ready' equates to the page being locked
2193 * until completion unlocks it. commit_write submits a page
2194 * as not ready because its unlock will happen unconditionally
2195 * as the call returns. if we race with commit_write giving
2196 * us that page we dont' want to create a hole in the page
2197 * stream, so we stop and leave the rpc to be fired by
2198 * another dirtier or kupdated interval (the not ready page
2199 * will still be on the dirty list). we could call in
2200 * at the end of ll_file_write to process the queue again. */
2201 if (!(oap->oap_async_flags & ASYNC_READY)) {
2202 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2204 CDEBUG(D_INODE, "oap %p page %p returned %d "
2205 "instead of ready\n", oap,
2209 /* llite is telling us that the page is still
2210 * in commit_write and that we should try
2211 * and put it in an rpc again later. we
2212 * break out of the loop so we don't create
2213 * a hole in the sequence of pages in the rpc
2218 /* the io isn't needed.. tell the checks
2219 * below to complete the rpc with EINTR */
2220 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2221 oap->oap_count = -EINTR;
2224 oap->oap_async_flags |= ASYNC_READY;
2227 LASSERTF(0, "oap %p page %p returned %d "
2228 "from make_ready\n", oap,
2236 * Page submitted for IO has to be locked. Either by
2237 * ->ap_make_ready() or by higher layers.
2239 #if defined(__KERNEL__) && defined(__linux__)
2240 if(!(PageLocked(oap->oap_page) &&
2241 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2242 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2243 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2247 /* If there is a gap at the start of this page, it can't merge
2248 * with any previous page, so we'll hand the network a
2249 * "fragmented" page array that it can't transfer in 1 RDMA */
2250 if (page_count != 0 && oap->oap_page_off != 0)
2253 /* take the page out of our book-keeping */
2254 list_del_init(&oap->oap_pending_item);
2255 lop_update_pending(cli, lop, cmd, -1);
2256 list_del_init(&oap->oap_urgent_item);
2258 if (page_count == 0)
2259 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2260 (PTLRPC_MAX_BRW_SIZE - 1);
2262 /* ask the caller for the size of the io as the rpc leaves. */
2263 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2265 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2266 if (oap->oap_count <= 0) {
2267 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2269 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2273 /* now put the page back in our accounting */
2274 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2275 if (page_count == 0)
2276 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2277 if (++page_count >= cli->cl_max_pages_per_rpc)
2280 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2281 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2282 * have the same alignment as the initial writes that allocated
2283 * extents on the server. */
2284 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2285 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2286 if (ending_offset == 0)
2289 /* If there is a gap at the end of this page, it can't merge
2290 * with any subsequent pages, so we'll hand the network a
2291 * "fragmented" page array that it can't transfer in 1 RDMA */
2292 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2296 osc_wake_cache_waiters(cli);
2298 if (page_count == 0)
2301 loi_list_maint(cli, loi);
2303 client_obd_list_unlock(&cli->cl_loi_list_lock);
2305 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2307 /* this should happen rarely and is pretty bad, it makes the
2308 * pending list not follow the dirty order */
2309 client_obd_list_lock(&cli->cl_loi_list_lock);
2310 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2311 list_del_init(&oap->oap_rpc_item);
2313 /* queued sync pages can be torn down while the pages
2314 * were between the pending list and the rpc */
2315 if (oap->oap_interrupted) {
2316 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2317 osc_ap_completion(cli, NULL, oap, 0,
2321 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2323 loi_list_maint(cli, loi);
2324 RETURN(PTR_ERR(req));
2327 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2329 if (cmd == OBD_BRW_READ) {
2330 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2331 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2332 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2333 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2334 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2336 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2337 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2338 cli->cl_w_in_flight);
2339 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2340 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2341 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2344 client_obd_list_lock(&cli->cl_loi_list_lock);
2346 if (cmd == OBD_BRW_READ)
2347 cli->cl_r_in_flight++;
2349 cli->cl_w_in_flight++;
2351 /* queued sync pages can be torn down while the pages
2352 * were between the pending list and the rpc */
2354 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2355 /* only one oap gets a request reference */
2358 if (oap->oap_interrupted && !req->rq_intr) {
2359 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2361 ptlrpc_mark_interrupted(req);
2365 tmp->oap_request = ptlrpc_request_addref(req);
2367 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2368 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2370 req->rq_interpret_reply = brw_interpret;
2371 ptlrpcd_add_req(req);
2375 #define LOI_DEBUG(LOI, STR, args...) \
2376 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2377 !list_empty(&(LOI)->loi_cli_item), \
2378 (LOI)->loi_write_lop.lop_num_pending, \
2379 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2380 (LOI)->loi_read_lop.lop_num_pending, \
2381 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2384 /* This is called by osc_check_rpcs() to find which objects have pages that
2385 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2386 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2389 /* first return all objects which we already know to have
2390 * pages ready to be stuffed into rpcs */
2391 if (!list_empty(&cli->cl_loi_ready_list))
2392 RETURN(list_entry(cli->cl_loi_ready_list.next,
2393 struct lov_oinfo, loi_cli_item));
2395 /* then if we have cache waiters, return all objects with queued
2396 * writes. This is especially important when many small files
2397 * have filled up the cache and not been fired into rpcs because
2398 * they don't pass the nr_pending/object threshhold */
2399 if (!list_empty(&cli->cl_cache_waiters) &&
2400 !list_empty(&cli->cl_loi_write_list))
2401 RETURN(list_entry(cli->cl_loi_write_list.next,
2402 struct lov_oinfo, loi_write_item));
2404 /* then return all queued objects when we have an invalid import
2405 * so that they get flushed */
2406 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2407 if (!list_empty(&cli->cl_loi_write_list))
2408 RETURN(list_entry(cli->cl_loi_write_list.next,
2409 struct lov_oinfo, loi_write_item));
2410 if (!list_empty(&cli->cl_loi_read_list))
2411 RETURN(list_entry(cli->cl_loi_read_list.next,
2412 struct lov_oinfo, loi_read_item));
2417 /* called with the loi list lock held */
2418 static void osc_check_rpcs(struct client_obd *cli)
2420 struct lov_oinfo *loi;
2421 int rc = 0, race_counter = 0;
2424 while ((loi = osc_next_loi(cli)) != NULL) {
2425 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2427 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2430 /* attempt some read/write balancing by alternating between
2431 * reads and writes in an object. The makes_rpc checks here
2432 * would be redundant if we were getting read/write work items
2433 * instead of objects. we don't want send_oap_rpc to drain a
2434 * partial read pending queue when we're given this object to
2435 * do io on writes while there are cache waiters */
2436 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2437 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2438 &loi->loi_write_lop);
2446 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2447 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2448 &loi->loi_read_lop);
2457 /* attempt some inter-object balancing by issueing rpcs
2458 * for each object in turn */
2459 if (!list_empty(&loi->loi_cli_item))
2460 list_del_init(&loi->loi_cli_item);
2461 if (!list_empty(&loi->loi_write_item))
2462 list_del_init(&loi->loi_write_item);
2463 if (!list_empty(&loi->loi_read_item))
2464 list_del_init(&loi->loi_read_item);
2466 loi_list_maint(cli, loi);
2468 /* send_oap_rpc fails with 0 when make_ready tells it to
2469 * back off. llite's make_ready does this when it tries
2470 * to lock a page queued for write that is already locked.
2471 * we want to try sending rpcs from many objects, but we
2472 * don't want to spin failing with 0. */
2473 if (race_counter == 10)
2479 /* we're trying to queue a page in the osc so we're subject to the
2480 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2481 * If the osc's queued pages are already at that limit, then we want to sleep
2482 * until there is space in the osc's queue for us. We also may be waiting for
2483 * write credits from the OST if there are RPCs in flight that may return some
2484 * before we fall back to sync writes.
2486 * We need this know our allocation was granted in the presence of signals */
2487 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2491 client_obd_list_lock(&cli->cl_loi_list_lock);
2492 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2493 client_obd_list_unlock(&cli->cl_loi_list_lock);
2497 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2498 * grant or cache space. */
2499 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2500 struct osc_async_page *oap)
2502 struct osc_cache_waiter ocw;
2503 struct l_wait_info lwi = { 0 };
2507 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2508 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2509 cli->cl_dirty_max, obd_max_dirty_pages,
2510 cli->cl_lost_grant, cli->cl_avail_grant);
2512 /* force the caller to try sync io. this can jump the list
2513 * of queued writes and create a discontiguous rpc stream */
2514 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2515 loi->loi_ar.ar_force_sync)
2518 /* Hopefully normal case - cache space and write credits available */
2519 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2520 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2521 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2522 /* account for ourselves */
2523 osc_consume_write_grant(cli, &oap->oap_brw_page);
2527 /* Make sure that there are write rpcs in flight to wait for. This
2528 * is a little silly as this object may not have any pending but
2529 * other objects sure might. */
2530 if (cli->cl_w_in_flight) {
2531 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2532 cfs_waitq_init(&ocw.ocw_waitq);
2536 loi_list_maint(cli, loi);
2537 osc_check_rpcs(cli);
2538 client_obd_list_unlock(&cli->cl_loi_list_lock);
2540 CDEBUG(D_CACHE, "sleeping for cache space\n");
2541 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2543 client_obd_list_lock(&cli->cl_loi_list_lock);
2544 if (!list_empty(&ocw.ocw_entry)) {
2545 list_del(&ocw.ocw_entry);
2554 static int osc_reget_short_lock(struct obd_export *exp,
2555 struct lov_stripe_md *lsm,
2557 obd_off start, obd_off end,
2560 struct osc_async_page *oap = *res;
2565 spin_lock(&oap->oap_lock);
2566 rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2567 start, end, cookie);
2568 spin_unlock(&oap->oap_lock);
2573 static int osc_release_short_lock(struct obd_export *exp,
2574 struct lov_stripe_md *lsm, obd_off end,
2575 void *cookie, int rw)
2578 ldlm_lock_fast_release(cookie, rw);
2579 /* no error could have happened at this layer */
2583 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2584 struct lov_oinfo *loi, cfs_page_t *page,
2585 obd_off offset, struct obd_async_page_ops *ops,
2586 void *data, void **res, int nocache,
2587 struct lustre_handle *lockh)
2589 struct osc_async_page *oap;
2590 struct ldlm_res_id oid = {{0}};
2595 return size_round(sizeof(*oap));
2598 oap->oap_magic = OAP_MAGIC;
2599 oap->oap_cli = &exp->exp_obd->u.cli;
2602 oap->oap_caller_ops = ops;
2603 oap->oap_caller_data = data;
2605 oap->oap_page = page;
2606 oap->oap_obj_off = offset;
2608 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2609 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2610 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2611 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2613 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2615 spin_lock_init(&oap->oap_lock);
2617 /* If the page was marked as notcacheable - don't add to any locks */
2619 oid.name[0] = loi->loi_id;
2620 oid.name[2] = loi->loi_gr;
2621 /* This is the only place where we can call cache_add_extent
2622 without oap_lock, because this page is locked now, and
2623 the lock we are adding it to is referenced, so cannot lose
2624 any pages either. */
2625 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2630 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2634 struct osc_async_page *oap_from_cookie(void *cookie)
2636 struct osc_async_page *oap = cookie;
2637 if (oap->oap_magic != OAP_MAGIC)
2638 return ERR_PTR(-EINVAL);
2642 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2643 struct lov_oinfo *loi, void *cookie,
2644 int cmd, obd_off off, int count,
2645 obd_flag brw_flags, enum async_flags async_flags)
2647 struct client_obd *cli = &exp->exp_obd->u.cli;
2648 struct osc_async_page *oap;
2652 oap = oap_from_cookie(cookie);
2654 RETURN(PTR_ERR(oap));
2656 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2659 if (!list_empty(&oap->oap_pending_item) ||
2660 !list_empty(&oap->oap_urgent_item) ||
2661 !list_empty(&oap->oap_rpc_item))
2664 /* check if the file's owner/group is over quota */
2665 #ifdef HAVE_QUOTA_SUPPORT
2666 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2667 struct obd_async_page_ops *ops;
2674 ops = oap->oap_caller_ops;
2675 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2676 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2687 loi = lsm->lsm_oinfo[0];
2689 client_obd_list_lock(&cli->cl_loi_list_lock);
2692 oap->oap_page_off = off;
2693 oap->oap_count = count;
2694 oap->oap_brw_flags = brw_flags;
2695 oap->oap_async_flags = async_flags;
2697 if (cmd & OBD_BRW_WRITE) {
2698 rc = osc_enter_cache(cli, loi, oap);
2700 client_obd_list_unlock(&cli->cl_loi_list_lock);
2705 osc_oap_to_pending(oap);
2706 loi_list_maint(cli, loi);
2708 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2711 osc_check_rpcs(cli);
2712 client_obd_list_unlock(&cli->cl_loi_list_lock);
2717 /* aka (~was & now & flag), but this is more clear :) */
2718 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2720 static int osc_set_async_flags(struct obd_export *exp,
2721 struct lov_stripe_md *lsm,
2722 struct lov_oinfo *loi, void *cookie,
2723 obd_flag async_flags)
2725 struct client_obd *cli = &exp->exp_obd->u.cli;
2726 struct loi_oap_pages *lop;
2727 struct osc_async_page *oap;
2731 oap = oap_from_cookie(cookie);
2733 RETURN(PTR_ERR(oap));
2736 * bug 7311: OST-side locking is only supported for liblustre for now
2737 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2738 * implementation has to handle case where OST-locked page was picked
2739 * up by, e.g., ->writepage().
2741 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2742 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2745 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2749 loi = lsm->lsm_oinfo[0];
2751 if (oap->oap_cmd & OBD_BRW_WRITE) {
2752 lop = &loi->loi_write_lop;
2754 lop = &loi->loi_read_lop;
2757 client_obd_list_lock(&cli->cl_loi_list_lock);
2759 if (list_empty(&oap->oap_pending_item))
2760 GOTO(out, rc = -EINVAL);
2762 if ((oap->oap_async_flags & async_flags) == async_flags)
2765 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2766 oap->oap_async_flags |= ASYNC_READY;
2768 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2769 if (list_empty(&oap->oap_rpc_item)) {
2770 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2771 loi_list_maint(cli, loi);
2775 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2776 oap->oap_async_flags);
2778 osc_check_rpcs(cli);
2779 client_obd_list_unlock(&cli->cl_loi_list_lock);
2783 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2784 struct lov_oinfo *loi,
2785 struct obd_io_group *oig, void *cookie,
2786 int cmd, obd_off off, int count,
2788 obd_flag async_flags)
2790 struct client_obd *cli = &exp->exp_obd->u.cli;
2791 struct osc_async_page *oap;
2792 struct loi_oap_pages *lop;
2796 oap = oap_from_cookie(cookie);
2798 RETURN(PTR_ERR(oap));
2800 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2803 if (!list_empty(&oap->oap_pending_item) ||
2804 !list_empty(&oap->oap_urgent_item) ||
2805 !list_empty(&oap->oap_rpc_item))
2809 loi = lsm->lsm_oinfo[0];
2811 client_obd_list_lock(&cli->cl_loi_list_lock);
2814 oap->oap_page_off = off;
2815 oap->oap_count = count;
2816 oap->oap_brw_flags = brw_flags;
2817 oap->oap_async_flags = async_flags;
2819 if (cmd & OBD_BRW_WRITE)
2820 lop = &loi->loi_write_lop;
2822 lop = &loi->loi_read_lop;
2824 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2825 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2827 rc = oig_add_one(oig, &oap->oap_occ);
2830 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2831 oap, oap->oap_page, rc);
2833 client_obd_list_unlock(&cli->cl_loi_list_lock);
2838 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2839 struct loi_oap_pages *lop, int cmd)
2841 struct list_head *pos, *tmp;
2842 struct osc_async_page *oap;
2844 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2845 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2846 list_del(&oap->oap_pending_item);
2847 osc_oap_to_pending(oap);
2849 loi_list_maint(cli, loi);
2852 static int osc_trigger_group_io(struct obd_export *exp,
2853 struct lov_stripe_md *lsm,
2854 struct lov_oinfo *loi,
2855 struct obd_io_group *oig)
2857 struct client_obd *cli = &exp->exp_obd->u.cli;
2861 loi = lsm->lsm_oinfo[0];
2863 client_obd_list_lock(&cli->cl_loi_list_lock);
2865 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2866 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2868 osc_check_rpcs(cli);
2869 client_obd_list_unlock(&cli->cl_loi_list_lock);
2874 static int osc_teardown_async_page(struct obd_export *exp,
2875 struct lov_stripe_md *lsm,
2876 struct lov_oinfo *loi, void *cookie)
2878 struct client_obd *cli = &exp->exp_obd->u.cli;
2879 struct loi_oap_pages *lop;
2880 struct osc_async_page *oap;
2884 oap = oap_from_cookie(cookie);
2886 RETURN(PTR_ERR(oap));
2889 loi = lsm->lsm_oinfo[0];
2891 if (oap->oap_cmd & OBD_BRW_WRITE) {
2892 lop = &loi->loi_write_lop;
2894 lop = &loi->loi_read_lop;
2897 client_obd_list_lock(&cli->cl_loi_list_lock);
2899 if (!list_empty(&oap->oap_rpc_item))
2900 GOTO(out, rc = -EBUSY);
2902 osc_exit_cache(cli, oap, 0);
2903 osc_wake_cache_waiters(cli);
2905 if (!list_empty(&oap->oap_urgent_item)) {
2906 list_del_init(&oap->oap_urgent_item);
2907 oap->oap_async_flags &= ~ASYNC_URGENT;
2909 if (!list_empty(&oap->oap_pending_item)) {
2910 list_del_init(&oap->oap_pending_item);
2911 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2913 loi_list_maint(cli, loi);
2914 cache_remove_extent(cli->cl_cache, oap);
2916 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2918 client_obd_list_unlock(&cli->cl_loi_list_lock);
2922 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2923 struct ldlm_lock_desc *new, void *data,
2926 struct lustre_handle lockh = { 0 };
2930 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2931 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2936 case LDLM_CB_BLOCKING:
2937 ldlm_lock2handle(lock, &lockh);
2938 rc = ldlm_cli_cancel(&lockh);
2940 CERROR("ldlm_cli_cancel failed: %d\n", rc);
2942 case LDLM_CB_CANCELING: {
2944 ldlm_lock2handle(lock, &lockh);
2945 /* This lock wasn't granted, don't try to do anything */
2946 if (lock->l_req_mode != lock->l_granted_mode)
2949 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2952 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2953 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2954 lock, new, data,flag);
2963 EXPORT_SYMBOL(osc_extent_blocking_cb);
2965 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2968 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2971 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2974 lock_res_and_lock(lock);
2975 #if defined (__KERNEL__) && defined (__linux__)
2976 /* Liang XXX: Darwin and Winnt checking should be added */
2977 if (lock->l_ast_data && lock->l_ast_data != data) {
2978 struct inode *new_inode = data;
2979 struct inode *old_inode = lock->l_ast_data;
2980 if (!(old_inode->i_state & I_FREEING))
2981 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2982 LASSERTF(old_inode->i_state & I_FREEING,
2983 "Found existing inode %p/%lu/%u state %lu in lock: "
2984 "setting data to %p/%lu/%u\n", old_inode,
2985 old_inode->i_ino, old_inode->i_generation,
2987 new_inode, new_inode->i_ino, new_inode->i_generation);
2990 lock->l_ast_data = data;
2991 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2992 unlock_res_and_lock(lock);
2993 LDLM_LOCK_PUT(lock);
2996 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2997 ldlm_iterator_t replace, void *data)
2999 struct ldlm_res_id res_id = { .name = {0} };
3000 struct obd_device *obd = class_exp2obd(exp);
3002 res_id.name[0] = lsm->lsm_object_id;
3003 res_id.name[2] = lsm->lsm_object_gr;
3005 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3009 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3010 struct obd_info *oinfo, int intent, int rc)
3015 /* The request was created before ldlm_cli_enqueue call. */
3016 if (rc == ELDLM_LOCK_ABORTED) {
3017 struct ldlm_reply *rep;
3018 rep = req_capsule_server_get(&req->rq_pill,
3021 LASSERT(rep != NULL);
3022 if (rep->lock_policy_res1)
3023 rc = rep->lock_policy_res1;
3027 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3028 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3029 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3030 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3031 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3035 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3037 /* Call the update callback. */
3038 rc = oinfo->oi_cb_up(oinfo, rc);
3042 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3043 struct osc_enqueue_args *aa, int rc)
3045 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3046 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3047 struct ldlm_lock *lock;
3049 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3051 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3053 /* Complete obtaining the lock procedure. */
3054 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3056 &aa->oa_oi->oi_flags,
3057 &lsm->lsm_oinfo[0]->loi_lvb,
3058 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3059 lustre_swab_ost_lvb,
3060 aa->oa_oi->oi_lockh, rc);
3062 /* Complete osc stuff. */
3063 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3065 /* Release the lock for async request. */
3066 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3067 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3069 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3070 aa->oa_oi->oi_lockh, req, aa);
3071 LDLM_LOCK_PUT(lock);
3075 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3076 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3077 * other synchronous requests, however keeping some locks and trying to obtain
3078 * others may take a considerable amount of time in a case of ost failure; and
3079 * when other sync requests do not get released lock from a client, the client
3080 * is excluded from the cluster -- such scenarious make the life difficult, so
3081 * release locks just after they are obtained. */
3082 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3083 struct ldlm_enqueue_info *einfo,
3084 struct ptlrpc_request_set *rqset)
3086 struct ldlm_res_id res_id = { .name = {0} };
3087 struct obd_device *obd = exp->exp_obd;
3088 struct ptlrpc_request *req = NULL;
3089 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3094 res_id.name[0] = oinfo->oi_md->lsm_object_id;
3095 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
3097 /* Filesystem lock extents are extended to page boundaries so that
3098 * dealing with the page cache is a little smoother. */
3099 oinfo->oi_policy.l_extent.start -=
3100 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3101 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3103 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3106 /* Next, search for already existing extent locks that will cover us */
3107 /* If we're trying to read, we also search for an existing PW lock. The
3108 * VFS and page cache already protect us locally, so lots of readers/
3109 * writers can share a single PW lock.
3111 * There are problems with conversion deadlocks, so instead of
3112 * converting a read lock to a write lock, we'll just enqueue a new
3115 * At some point we should cancel the read lock instead of making them
3116 * send us a blocking callback, but there are problems with canceling
3117 * locks out from other users right now, too. */
3118 mode = einfo->ei_mode;
3119 if (einfo->ei_mode == LCK_PR)
3121 mode = ldlm_lock_match(obd->obd_namespace,
3122 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3123 einfo->ei_type, &oinfo->oi_policy, mode,
3126 /* addref the lock only if not async requests and PW lock is
3127 * matched whereas we asked for PR. */
3128 if (!rqset && einfo->ei_mode != mode)
3129 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3130 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3133 /* I would like to be able to ASSERT here that rss <=
3134 * kms, but I can't, for reasons which are explained in
3138 /* We already have a lock, and it's referenced */
3139 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3141 /* For async requests, decref the lock. */
3142 if (einfo->ei_mode != mode)
3143 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3145 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3152 CFS_LIST_HEAD(cancels);
3153 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3154 &RQF_LDLM_ENQUEUE_LVB);
3158 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3162 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3163 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3164 ptlrpc_request_set_replen(req);
3167 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3168 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3170 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3171 &oinfo->oi_policy, &oinfo->oi_flags,
3172 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3173 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3174 lustre_swab_ost_lvb, oinfo->oi_lockh,
3178 struct osc_enqueue_args *aa;
3179 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3180 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3185 req->rq_interpret_reply = osc_enqueue_interpret;
3186 ptlrpc_set_add_req(rqset, req);
3187 } else if (intent) {
3188 ptlrpc_req_finished(req);
3193 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3195 ptlrpc_req_finished(req);
3200 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3201 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3202 int *flags, void *data, struct lustre_handle *lockh)
3204 struct ldlm_res_id res_id = { .name = {0} };
3205 struct obd_device *obd = exp->exp_obd;
3206 int lflags = *flags;
3210 res_id.name[0] = lsm->lsm_object_id;
3211 res_id.name[2] = lsm->lsm_object_gr;
3213 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3216 /* Filesystem lock extents are extended to page boundaries so that
3217 * dealing with the page cache is a little smoother */
3218 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3219 policy->l_extent.end |= ~CFS_PAGE_MASK;
3221 /* Next, search for already existing extent locks that will cover us */
3222 /* If we're trying to read, we also search for an existing PW lock. The
3223 * VFS and page cache already protect us locally, so lots of readers/
3224 * writers can share a single PW lock. */
3228 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3229 &res_id, type, policy, rc, lockh);
3231 osc_set_data_with_check(lockh, data, lflags);
3232 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3233 ldlm_lock_addref(lockh, LCK_PR);
3234 ldlm_lock_decref(lockh, LCK_PW);
3241 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3242 __u32 mode, struct lustre_handle *lockh)
3246 if (unlikely(mode == LCK_GROUP))
3247 ldlm_lock_decref_and_cancel(lockh, mode);
3249 ldlm_lock_decref(lockh, mode);
3254 static int osc_cancel_unused(struct obd_export *exp,
3255 struct lov_stripe_md *lsm, int flags,
3258 struct obd_device *obd = class_exp2obd(exp);
3259 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3262 res_id.name[0] = lsm->lsm_object_id;
3263 res_id.name[2] = lsm->lsm_object_gr;
3267 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3270 static int osc_join_lru(struct obd_export *exp,
3271 struct lov_stripe_md *lsm, int join)
3273 struct obd_device *obd = class_exp2obd(exp);
3274 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3277 res_id.name[0] = lsm->lsm_object_id;
3278 res_id.name[2] = lsm->lsm_object_gr;
3282 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3285 static int osc_statfs_interpret(struct ptlrpc_request *req,
3286 struct osc_async_args *aa, int rc)
3288 struct obd_statfs *msfs;
3294 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3296 GOTO(out, rc = -EPROTO);
3299 *aa->aa_oi->oi_osfs = *msfs;
3301 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3305 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3306 __u64 max_age, struct ptlrpc_request_set *rqset)
3308 struct ptlrpc_request *req;
3309 struct osc_async_args *aa;
3313 /* We could possibly pass max_age in the request (as an absolute
3314 * timestamp or a "seconds.usec ago") so the target can avoid doing
3315 * extra calls into the filesystem if that isn't necessary (e.g.
3316 * during mount that would help a bit). Having relative timestamps
3317 * is not so great if request processing is slow, while absolute
3318 * timestamps are not ideal because they need time synchronization. */
3319 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3323 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3325 ptlrpc_request_free(req);
3328 ptlrpc_request_set_replen(req);
3329 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3330 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3331 /* procfs requests not want stat in wait for avoid deadlock */
3332 req->rq_no_resend = 1;
3333 req->rq_no_delay = 1;
3336 req->rq_interpret_reply = osc_statfs_interpret;
3337 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3338 aa = (struct osc_async_args *)&req->rq_async_args;
3341 ptlrpc_set_add_req(rqset, req);
3345 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3346 __u64 max_age, __u32 flags)
3348 struct obd_statfs *msfs;
3349 struct ptlrpc_request *req;
3353 /* We could possibly pass max_age in the request (as an absolute
3354 * timestamp or a "seconds.usec ago") so the target can avoid doing
3355 * extra calls into the filesystem if that isn't necessary (e.g.
3356 * during mount that would help a bit). Having relative timestamps
3357 * is not so great if request processing is slow, while absolute
3358 * timestamps are not ideal because they need time synchronization. */
3359 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3363 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3365 ptlrpc_request_free(req);
3368 ptlrpc_request_set_replen(req);
3369 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3371 if (flags & OBD_STATFS_NODELAY) {
3372 /* procfs requests not want stat in wait for avoid deadlock */
3373 req->rq_no_resend = 1;
3374 req->rq_no_delay = 1;
3377 rc = ptlrpc_queue_wait(req);
3381 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3383 GOTO(out, rc = -EPROTO);
3390 ptlrpc_req_finished(req);
3394 /* Retrieve object striping information.
3396 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3397 * the maximum number of OST indices which will fit in the user buffer.
3398 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3400 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3402 struct lov_user_md lum, *lumk;
3403 int rc = 0, lum_size;
3409 if (copy_from_user(&lum, lump, sizeof(lum)))
3412 if (lum.lmm_magic != LOV_USER_MAGIC)
3415 if (lum.lmm_stripe_count > 0) {
3416 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3417 OBD_ALLOC(lumk, lum_size);
3421 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3422 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3424 lum_size = sizeof(lum);
3428 lumk->lmm_object_id = lsm->lsm_object_id;
3429 lumk->lmm_object_gr = lsm->lsm_object_gr;
3430 lumk->lmm_stripe_count = 1;
3432 if (copy_to_user(lump, lumk, lum_size))
3436 OBD_FREE(lumk, lum_size);
3442 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3443 void *karg, void *uarg)
3445 struct obd_device *obd = exp->exp_obd;
3446 struct obd_ioctl_data *data = karg;
3450 if (!try_module_get(THIS_MODULE)) {
3451 CERROR("Can't get module. Is it alive?");
3455 case OBD_IOC_LOV_GET_CONFIG: {
3457 struct lov_desc *desc;
3458 struct obd_uuid uuid;
3462 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3463 GOTO(out, err = -EINVAL);
3465 data = (struct obd_ioctl_data *)buf;
3467 if (sizeof(*desc) > data->ioc_inllen1) {
3468 obd_ioctl_freedata(buf, len);
3469 GOTO(out, err = -EINVAL);
3472 if (data->ioc_inllen2 < sizeof(uuid)) {
3473 obd_ioctl_freedata(buf, len);
3474 GOTO(out, err = -EINVAL);
3477 desc = (struct lov_desc *)data->ioc_inlbuf1;
3478 desc->ld_tgt_count = 1;
3479 desc->ld_active_tgt_count = 1;
3480 desc->ld_default_stripe_count = 1;
3481 desc->ld_default_stripe_size = 0;
3482 desc->ld_default_stripe_offset = 0;
3483 desc->ld_pattern = 0;
3484 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3486 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3488 err = copy_to_user((void *)uarg, buf, len);
3491 obd_ioctl_freedata(buf, len);
3494 case LL_IOC_LOV_SETSTRIPE:
3495 err = obd_alloc_memmd(exp, karg);
3499 case LL_IOC_LOV_GETSTRIPE:
3500 err = osc_getstripe(karg, uarg);
3502 case OBD_IOC_CLIENT_RECOVER:
3503 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3508 case IOC_OSC_SET_ACTIVE:
3509 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3512 case OBD_IOC_POLL_QUOTACHECK:
3513 err = lquota_poll_check(quota_interface, exp,
3514 (struct if_quotacheck *)karg);
3517 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3518 cmd, cfs_curproc_comm());
3519 GOTO(out, err = -ENOTTY);
3522 module_put(THIS_MODULE);
3526 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3527 void *key, __u32 *vallen, void *val)
3530 if (!vallen || !val)
3533 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3534 __u32 *stripe = val;
3535 *vallen = sizeof(*stripe);
3538 } else if (KEY_IS(KEY_LAST_ID)) {
3539 struct ptlrpc_request *req;
3544 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3545 &RQF_OST_GET_INFO_LAST_ID);
3549 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3550 RCL_CLIENT, keylen);
3551 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3553 ptlrpc_request_free(req);
3557 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3558 memcpy(tmp, key, keylen);
3560 ptlrpc_request_set_replen(req);
3561 rc = ptlrpc_queue_wait(req);
3565 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3567 GOTO(out, rc = -EPROTO);
3569 *((obd_id *)val) = *reply;
3571 ptlrpc_req_finished(req);
3577 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3580 struct llog_ctxt *ctxt;
3581 struct obd_import *imp = req->rq_import;
3587 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3590 rc = llog_initiator_connect(ctxt);
3592 CERROR("cannot establish connection for "
3593 "ctxt %p: %d\n", ctxt, rc);
3596 llog_ctxt_put(ctxt);
3597 spin_lock(&imp->imp_lock);
3598 imp->imp_server_timeout = 1;
3599 imp->imp_pingable = 1;
3600 spin_unlock(&imp->imp_lock);
3601 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3606 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3607 void *key, obd_count vallen, void *val,
3608 struct ptlrpc_request_set *set)
3610 struct ptlrpc_request *req;
3611 struct obd_device *obd = exp->exp_obd;
3612 struct obd_import *imp = class_exp2cliimp(exp);
3617 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3619 if (KEY_IS(KEY_NEXT_ID)) {
3620 if (vallen != sizeof(obd_id))
3624 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3625 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3626 exp->exp_obd->obd_name,
3627 obd->u.cli.cl_oscc.oscc_next_id);
3632 if (KEY_IS(KEY_UNLINKED)) {
3633 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3634 spin_lock(&oscc->oscc_lock);
3635 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3636 spin_unlock(&oscc->oscc_lock);
3640 if (KEY_IS(KEY_INIT_RECOV)) {
3641 if (vallen != sizeof(int))
3643 spin_lock(&imp->imp_lock);
3644 imp->imp_initial_recov = *(int *)val;
3645 spin_unlock(&imp->imp_lock);
3646 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3647 exp->exp_obd->obd_name,
3648 imp->imp_initial_recov);
3652 if (KEY_IS(KEY_CHECKSUM)) {
3653 if (vallen != sizeof(int))
3655 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3659 if (KEY_IS(KEY_FLUSH_CTX)) {
3660 sptlrpc_import_flush_my_ctx(imp);
3667 /* We pass all other commands directly to OST. Since nobody calls osc
3668 methods directly and everybody is supposed to go through LOV, we
3669 assume lov checked invalid values for us.
3670 The only recognised values so far are evict_by_nid and mds_conn.
3671 Even if something bad goes through, we'd get a -EINVAL from OST
3675 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3679 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3680 RCL_CLIENT, keylen);
3681 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3682 RCL_CLIENT, vallen);
3683 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3685 ptlrpc_request_free(req);
3689 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3690 memcpy(tmp, key, keylen);
3691 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3692 memcpy(tmp, val, vallen);
3694 if (KEY_IS(KEY_MDS_CONN)) {
3695 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3697 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3698 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3699 LASSERT(oscc->oscc_oa.o_gr > 0);
3700 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3703 ptlrpc_request_set_replen(req);
3704 ptlrpc_set_add_req(set, req);
3705 ptlrpc_check_set(set);
3711 static struct llog_operations osc_size_repl_logops = {
3712 lop_cancel: llog_obd_repl_cancel
3715 static struct llog_operations osc_mds_ost_orig_logops;
3716 static int osc_llog_init(struct obd_device *obd, int group,
3717 struct obd_device *tgt, int count,
3718 struct llog_catid *catid, struct obd_uuid *uuid)
3722 LASSERT(group == OBD_LLOG_GROUP);
3723 spin_lock(&obd->obd_dev_lock);
3724 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3725 osc_mds_ost_orig_logops = llog_lvfs_ops;
3726 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3727 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3728 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3729 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3731 spin_unlock(&obd->obd_dev_lock);
3733 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3734 &catid->lci_logid, &osc_mds_ost_orig_logops);
3736 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3740 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3741 NULL, &osc_size_repl_logops);
3743 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3746 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3747 obd->obd_name, tgt->obd_name, count, catid, rc);
3748 CERROR("logid "LPX64":0x%x\n",
3749 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3754 static int osc_llog_finish(struct obd_device *obd, int count)
3756 struct llog_ctxt *ctxt;
3757 int rc = 0, rc2 = 0;
3760 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3762 rc = llog_cleanup(ctxt);
3764 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3766 rc2 = llog_cleanup(ctxt);
3773 static int osc_reconnect(const struct lu_env *env,
3774 struct obd_export *exp, struct obd_device *obd,
3775 struct obd_uuid *cluuid,
3776 struct obd_connect_data *data)
3778 struct client_obd *cli = &obd->u.cli;
3780 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3783 client_obd_list_lock(&cli->cl_loi_list_lock);
3784 data->ocd_grant = cli->cl_avail_grant ?:
3785 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3786 lost_grant = cli->cl_lost_grant;
3787 cli->cl_lost_grant = 0;
3788 client_obd_list_unlock(&cli->cl_loi_list_lock);
3790 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3791 "cl_lost_grant: %ld\n", data->ocd_grant,
3792 cli->cl_avail_grant, lost_grant);
3793 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3794 " ocd_grant: %d\n", data->ocd_connect_flags,
3795 data->ocd_version, data->ocd_grant);
3801 static int osc_disconnect(struct obd_export *exp)
3803 struct obd_device *obd = class_exp2obd(exp);
3804 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3807 if (obd->u.cli.cl_conn_count == 1)
3808 /* flush any remaining cancel messages out to the target */
3809 llog_sync(ctxt, exp);
3811 llog_ctxt_put(ctxt);
3813 rc = client_disconnect_export(exp);
3817 static int osc_import_event(struct obd_device *obd,
3818 struct obd_import *imp,
3819 enum obd_import_event event)
3821 struct client_obd *cli;
3825 LASSERT(imp->imp_obd == obd);
3828 case IMP_EVENT_DISCON: {
3829 /* Only do this on the MDS OSC's */
3830 if (imp->imp_server_timeout) {
3831 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3833 spin_lock(&oscc->oscc_lock);
3834 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3835 spin_unlock(&oscc->oscc_lock);
3838 client_obd_list_lock(&cli->cl_loi_list_lock);
3839 cli->cl_avail_grant = 0;
3840 cli->cl_lost_grant = 0;
3841 client_obd_list_unlock(&cli->cl_loi_list_lock);
3844 case IMP_EVENT_INACTIVE: {
3845 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3848 case IMP_EVENT_INVALIDATE: {
3849 struct ldlm_namespace *ns = obd->obd_namespace;
3853 client_obd_list_lock(&cli->cl_loi_list_lock);
3854 /* all pages go to failing rpcs due to the invalid import */
3855 osc_check_rpcs(cli);
3856 client_obd_list_unlock(&cli->cl_loi_list_lock);
3858 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3862 case IMP_EVENT_ACTIVE: {
3863 /* Only do this on the MDS OSC's */
3864 if (imp->imp_server_timeout) {
3865 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3867 spin_lock(&oscc->oscc_lock);
3868 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3869 spin_unlock(&oscc->oscc_lock);
3871 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3874 case IMP_EVENT_OCD: {
3875 struct obd_connect_data *ocd = &imp->imp_connect_data;
3877 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3878 osc_init_grant(&obd->u.cli, ocd);
3881 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3882 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3884 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3888 CERROR("Unknown import event %d\n", event);
3894 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3900 rc = ptlrpcd_addref();
3904 rc = client_obd_setup(obd, lcfg);
3908 struct lprocfs_static_vars lvars = { 0 };
3909 struct client_obd *cli = &obd->u.cli;
3911 lprocfs_osc_init_vars(&lvars);
3912 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3913 lproc_osc_attach_seqstat(obd);
3914 sptlrpc_lprocfs_cliobd_attach(obd);
3915 ptlrpc_lprocfs_register_obd(obd);
3919 /* We need to allocate a few requests more, because
3920 brw_interpret tries to create new requests before freeing
3921 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3922 reserved, but I afraid that might be too much wasted RAM
3923 in fact, so 2 is just my guess and still should work. */
3924 cli->cl_import->imp_rq_pool =
3925 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3927 ptlrpc_add_rqs_to_pool);
3928 cli->cl_cache = cache_create(obd);
3929 if (!cli->cl_cache) {
3938 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3944 case OBD_CLEANUP_EARLY: {
3945 struct obd_import *imp;
3946 imp = obd->u.cli.cl_import;
3947 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3948 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3949 ptlrpc_deactivate_import(imp);
3950 spin_lock(&imp->imp_lock);
3951 imp->imp_pingable = 0;
3952 spin_unlock(&imp->imp_lock);
3955 case OBD_CLEANUP_EXPORTS: {
3956 /* If we set up but never connected, the
3957 client import will not have been cleaned. */
3958 if (obd->u.cli.cl_import) {
3959 struct obd_import *imp;
3960 imp = obd->u.cli.cl_import;
3961 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3963 ptlrpc_invalidate_import(imp);
3964 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3965 class_destroy_import(imp);
3966 obd->u.cli.cl_import = NULL;
3970 case OBD_CLEANUP_SELF_EXP:
3971 rc = obd_llog_finish(obd, 0);
3973 CERROR("failed to cleanup llogging subsystems\n");
3975 case OBD_CLEANUP_OBD:
3981 int osc_cleanup(struct obd_device *obd)
3983 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3987 ptlrpc_lprocfs_unregister_obd(obd);
3988 lprocfs_obd_cleanup(obd);
3990 spin_lock(&oscc->oscc_lock);
3991 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3992 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3993 spin_unlock(&oscc->oscc_lock);
3995 /* free memory of osc quota cache */
3996 lquota_cleanup(quota_interface, obd);
3998 cache_destroy(obd->u.cli.cl_cache);
3999 rc = client_obd_cleanup(obd);
4005 static int osc_register_page_removal_cb(struct obd_export *exp,
4006 obd_page_removal_cb_t func,
4007 obd_pin_extent_cb pin_cb)
4009 return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4013 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4014 obd_page_removal_cb_t func)
4016 return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4019 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4020 obd_lock_cancel_cb cb)
4022 LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4024 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4028 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4029 obd_lock_cancel_cb cb)
4031 if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4032 CERROR("Unregistering cancel cb %p, while only %p was "
4034 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4038 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4042 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4044 struct lustre_cfg *lcfg = buf;
4045 struct lprocfs_static_vars lvars = { 0 };
4048 lprocfs_osc_init_vars(&lvars);
4050 switch (lcfg->lcfg_command) {
4051 case LCFG_SPTLRPC_CONF:
4052 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4055 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4063 struct obd_ops osc_obd_ops = {
4064 .o_owner = THIS_MODULE,
4065 .o_setup = osc_setup,
4066 .o_precleanup = osc_precleanup,
4067 .o_cleanup = osc_cleanup,
4068 .o_add_conn = client_import_add_conn,
4069 .o_del_conn = client_import_del_conn,
4070 .o_connect = client_connect_import,
4071 .o_reconnect = osc_reconnect,
4072 .o_disconnect = osc_disconnect,
4073 .o_statfs = osc_statfs,
4074 .o_statfs_async = osc_statfs_async,
4075 .o_packmd = osc_packmd,
4076 .o_unpackmd = osc_unpackmd,
4077 .o_precreate = osc_precreate,
4078 .o_create = osc_create,
4079 .o_destroy = osc_destroy,
4080 .o_getattr = osc_getattr,
4081 .o_getattr_async = osc_getattr_async,
4082 .o_setattr = osc_setattr,
4083 .o_setattr_async = osc_setattr_async,
4085 .o_brw_async = osc_brw_async,
4086 .o_prep_async_page = osc_prep_async_page,
4087 .o_reget_short_lock = osc_reget_short_lock,
4088 .o_release_short_lock = osc_release_short_lock,
4089 .o_queue_async_io = osc_queue_async_io,
4090 .o_set_async_flags = osc_set_async_flags,
4091 .o_queue_group_io = osc_queue_group_io,
4092 .o_trigger_group_io = osc_trigger_group_io,
4093 .o_teardown_async_page = osc_teardown_async_page,
4094 .o_punch = osc_punch,
4096 .o_enqueue = osc_enqueue,
4097 .o_match = osc_match,
4098 .o_change_cbdata = osc_change_cbdata,
4099 .o_cancel = osc_cancel,
4100 .o_cancel_unused = osc_cancel_unused,
4101 .o_join_lru = osc_join_lru,
4102 .o_iocontrol = osc_iocontrol,
4103 .o_get_info = osc_get_info,
4104 .o_set_info_async = osc_set_info_async,
4105 .o_import_event = osc_import_event,
4106 .o_llog_init = osc_llog_init,
4107 .o_llog_finish = osc_llog_finish,
4108 .o_process_config = osc_process_config,
4109 .o_register_page_removal_cb = osc_register_page_removal_cb,
4110 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4111 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4112 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4114 int __init osc_init(void)
4116 struct lprocfs_static_vars lvars = { 0 };
4120 lprocfs_osc_init_vars(&lvars);
4122 request_module("lquota");
4123 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4124 lquota_init(quota_interface);
4125 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4127 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4128 LUSTRE_OSC_NAME, NULL);
4130 if (quota_interface)
4131 PORTAL_SYMBOL_PUT(osc_quota_interface);
4139 static void /*__exit*/ osc_exit(void)
4141 lquota_exit(quota_interface);
4142 if (quota_interface)
4143 PORTAL_SYMBOL_PUT(osc_quota_interface);
4145 class_unregister_type(LUSTRE_OSC_NAME);
4148 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4149 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4150 MODULE_LICENSE("GPL");
4152 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);