1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_cksum.h>
55 #include <lustre_ha.h>
56 #include <lprocfs_status.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include "osc_internal.h"
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69 struct lov_stripe_md *lsm)
74 lmm_size = sizeof(**lmmp);
79 OBD_FREE(*lmmp, lmm_size);
85 OBD_ALLOC(*lmmp, lmm_size);
91 LASSERT(lsm->lsm_object_id);
92 LASSERT(lsm->lsm_object_gr);
93 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
94 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102 struct lov_mds_md *lmm, int lmm_bytes)
108 if (lmm_bytes < sizeof (*lmm)) {
109 CERROR("lov_mds_md too small: %d, need %d\n",
110 lmm_bytes, (int)sizeof(*lmm));
113 /* XXX LOV_MAGIC etc check? */
115 if (lmm->lmm_object_id == 0) {
116 CERROR("lov_mds_md: zero lmm_object_id\n");
121 lsm_size = lov_stripe_md_size(1);
125 if (*lsmp != NULL && lmm == NULL) {
126 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127 OBD_FREE(*lsmp, lsm_size);
133 OBD_ALLOC(*lsmp, lsm_size);
136 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138 OBD_FREE(*lsmp, lsm_size);
141 loi_init((*lsmp)->lsm_oinfo[0]);
145 /* XXX zero *lsmp? */
146 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
148 LASSERT((*lsmp)->lsm_object_id);
149 LASSERT((*lsmp)->lsm_object_gr);
152 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
157 static inline void osc_pack_capa(struct ptlrpc_request *req,
158 struct ost_body *body, void *capa)
160 struct obd_capa *oc = (struct obd_capa *)capa;
161 struct lustre_capa *c;
166 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
169 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
170 DEBUG_CAPA(D_SEC, c, "pack");
173 static inline void osc_pack_req_body(struct ptlrpc_request *req,
174 struct obd_info *oinfo)
176 struct ost_body *body;
178 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
181 body->oa = *oinfo->oi_oa;
182 osc_pack_capa(req, body, oinfo->oi_capa);
185 static inline void osc_set_capa_size(struct ptlrpc_request *req,
186 const struct req_msg_field *field,
190 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
192 /* it is already calculated as sizeof struct obd_capa */
196 static int osc_getattr_interpret(struct ptlrpc_request *req,
197 struct osc_async_args *aa, int rc)
199 struct ost_body *body;
205 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
206 lustre_swab_ost_body);
208 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
209 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
211 /* This should really be sent by the OST */
212 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
213 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
215 CDEBUG(D_INFO, "can't unpack ost_body\n");
217 aa->aa_oi->oi_oa->o_valid = 0;
220 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
224 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
225 struct ptlrpc_request_set *set)
227 struct ptlrpc_request *req;
228 struct osc_async_args *aa;
232 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
236 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
237 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
239 ptlrpc_request_free(req);
243 osc_pack_req_body(req, oinfo);
245 ptlrpc_request_set_replen(req);
246 req->rq_interpret_reply = osc_getattr_interpret;
248 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
249 aa = (struct osc_async_args *)&req->rq_async_args;
252 ptlrpc_set_add_req(set, req);
256 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
258 struct ptlrpc_request *req;
259 struct ost_body *body;
263 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
267 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
268 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
270 ptlrpc_request_free(req);
274 osc_pack_req_body(req, oinfo);
276 ptlrpc_request_set_replen(req);
278 rc = ptlrpc_queue_wait(req);
282 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
284 GOTO(out, rc = -EPROTO);
286 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
287 *oinfo->oi_oa = body->oa;
289 /* This should really be sent by the OST */
290 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
291 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
295 ptlrpc_req_finished(req);
299 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
300 struct obd_trans_info *oti)
302 struct ptlrpc_request *req;
303 struct ost_body *body;
307 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
308 oinfo->oi_oa->o_gr > 0);
310 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
314 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
315 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
317 ptlrpc_request_free(req);
321 osc_pack_req_body(req, oinfo);
323 ptlrpc_request_set_replen(req);
326 rc = ptlrpc_queue_wait(req);
330 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
332 GOTO(out, rc = -EPROTO);
334 *oinfo->oi_oa = body->oa;
338 ptlrpc_req_finished(req);
342 static int osc_setattr_interpret(struct ptlrpc_request *req,
343 struct osc_async_args *aa, int rc)
345 struct ost_body *body;
351 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
353 GOTO(out, rc = -EPROTO);
355 *aa->aa_oi->oi_oa = body->oa;
357 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
361 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
362 struct obd_trans_info *oti,
363 struct ptlrpc_request_set *rqset)
365 struct ptlrpc_request *req;
366 struct osc_async_args *aa;
370 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
374 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
375 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
377 ptlrpc_request_free(req);
381 osc_pack_req_body(req, oinfo);
383 ptlrpc_request_set_replen(req);
385 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
387 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
390 /* do mds to ost setattr asynchronouly */
392 /* Do not wait for response. */
393 ptlrpcd_add_req(req);
395 req->rq_interpret_reply = osc_setattr_interpret;
397 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
398 aa = (struct osc_async_args *)&req->rq_async_args;
401 ptlrpc_set_add_req(rqset, req);
407 int osc_real_create(struct obd_export *exp, struct obdo *oa,
408 struct lov_stripe_md **ea, struct obd_trans_info *oti)
410 struct ptlrpc_request *req;
411 struct ost_body *body;
412 struct lov_stripe_md *lsm;
421 rc = obd_alloc_memmd(exp, &lsm);
426 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
428 GOTO(out, rc = -ENOMEM);
430 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
432 ptlrpc_request_free(req);
436 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
440 ptlrpc_request_set_replen(req);
442 if (oa->o_valid & OBD_MD_FLINLINE) {
443 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
444 oa->o_flags == OBD_FL_DELORPHAN);
446 "delorphan from OST integration");
447 /* Don't resend the delorphan req */
448 req->rq_no_resend = req->rq_no_delay = 1;
451 rc = ptlrpc_queue_wait(req);
455 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
457 GOTO(out_req, rc = -EPROTO);
461 /* This should really be sent by the OST */
462 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
463 oa->o_valid |= OBD_MD_FLBLKSZ;
465 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
466 * have valid lsm_oinfo data structs, so don't go touching that.
467 * This needs to be fixed in a big way.
469 lsm->lsm_object_id = oa->o_id;
470 lsm->lsm_object_gr = oa->o_gr;
474 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
476 if (oa->o_valid & OBD_MD_FLCOOKIE) {
477 if (!oti->oti_logcookies)
478 oti_alloc_cookies(oti, 1);
479 *oti->oti_logcookies = *obdo_logcookie(oa);
483 CDEBUG(D_HA, "transno: "LPD64"\n",
484 lustre_msg_get_transno(req->rq_repmsg));
486 ptlrpc_req_finished(req);
489 obd_free_memmd(exp, &lsm);
493 static int osc_punch_interpret(struct ptlrpc_request *req,
494 struct osc_async_args *aa, int rc)
496 struct ost_body *body;
502 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
504 GOTO(out, rc = -EPROTO);
506 *aa->aa_oi->oi_oa = body->oa;
508 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
512 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
513 struct obd_trans_info *oti,
514 struct ptlrpc_request_set *rqset)
516 struct ptlrpc_request *req;
517 struct osc_async_args *aa;
518 struct ost_body *body;
523 CDEBUG(D_INFO, "oa NULL\n");
527 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
531 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
532 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
534 ptlrpc_request_free(req);
537 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
538 osc_pack_req_body(req, oinfo);
540 /* overload the size and blocks fields in the oa with start/end */
541 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543 body->oa.o_size = oinfo->oi_policy.l_extent.start;
544 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
545 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
546 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = osc_punch_interpret;
550 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551 aa = (struct osc_async_args *)&req->rq_async_args;
553 ptlrpc_set_add_req(rqset, req);
558 static int osc_sync(struct obd_export *exp, struct obdo *oa,
559 struct lov_stripe_md *md, obd_size start, obd_size end,
562 struct ptlrpc_request *req;
563 struct ost_body *body;
568 CDEBUG(D_INFO, "oa NULL\n");
572 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
576 osc_set_capa_size(req, &RMF_CAPA1, capa);
577 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
579 ptlrpc_request_free(req);
583 /* overload the size and blocks fields in the oa with start/end */
584 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
587 body->oa.o_size = start;
588 body->oa.o_blocks = end;
589 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
590 osc_pack_capa(req, body, capa);
592 ptlrpc_request_set_replen(req);
594 rc = ptlrpc_queue_wait(req);
598 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
600 GOTO(out, rc = -EPROTO);
606 ptlrpc_req_finished(req);
610 /* Find and cancel locally locks matched by @mode in the resource found by
611 * @objid. Found locks are added into @cancel list. Returns the amount of
612 * locks added to @cancels list. */
613 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
614 struct list_head *cancels, ldlm_mode_t mode,
617 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
618 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
619 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
626 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
627 lock_flags, 0, NULL);
628 ldlm_resource_putref(res);
632 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
635 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
637 atomic_dec(&cli->cl_destroy_in_flight);
638 cfs_waitq_signal(&cli->cl_destroy_waitq);
642 static int osc_can_send_destroy(struct client_obd *cli)
644 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
645 cli->cl_max_rpcs_in_flight) {
646 /* The destroy request can be sent */
649 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
650 cli->cl_max_rpcs_in_flight) {
652 * The counter has been modified between the two atomic
655 cfs_waitq_signal(&cli->cl_destroy_waitq);
660 /* Destroy requests can be async always on the client, and we don't even really
661 * care about the return code since the client cannot do anything at all about
663 * When the MDS is unlinking a filename, it saves the file objects into a
664 * recovery llog, and these object records are cancelled when the OST reports
665 * they were destroyed and sync'd to disk (i.e. transaction committed).
666 * If the client dies, or the OST is down when the object should be destroyed,
667 * the records are not cancelled, and when the OST reconnects to the MDS next,
668 * it will retrieve the llog unlink logs and then sends the log cancellation
669 * cookies to the MDS after committing destroy transactions. */
670 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
671 struct lov_stripe_md *ea, struct obd_trans_info *oti,
672 struct obd_export *md_export)
674 struct client_obd *cli = &exp->exp_obd->u.cli;
675 struct ptlrpc_request *req;
676 struct ost_body *body;
677 CFS_LIST_HEAD(cancels);
682 CDEBUG(D_INFO, "oa NULL\n");
686 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
687 LDLM_FL_DISCARD_DATA);
689 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
691 ldlm_lock_list_put(&cancels, l_bl_ast, count);
695 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
698 ptlrpc_request_free(req);
702 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
703 req->rq_interpret_reply = osc_destroy_interpret;
705 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
706 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
707 sizeof(*oti->oti_logcookies));
708 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
712 ptlrpc_request_set_replen(req);
714 if (!osc_can_send_destroy(cli)) {
715 struct l_wait_info lwi = { 0 };
718 * Wait until the number of on-going destroy RPCs drops
719 * under max_rpc_in_flight
721 l_wait_event_exclusive(cli->cl_destroy_waitq,
722 osc_can_send_destroy(cli), &lwi);
725 /* Do not wait for response */
726 ptlrpcd_add_req(req);
730 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
733 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
735 LASSERT(!(oa->o_valid & bits));
738 client_obd_list_lock(&cli->cl_loi_list_lock);
739 oa->o_dirty = cli->cl_dirty;
740 if (cli->cl_dirty > cli->cl_dirty_max) {
741 CERROR("dirty %lu > dirty_max %lu\n",
742 cli->cl_dirty, cli->cl_dirty_max);
744 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
745 CERROR("dirty %d > system dirty_max %d\n",
746 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
748 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
749 CERROR("dirty %lu - dirty_max %lu too big???\n",
750 cli->cl_dirty, cli->cl_dirty_max);
753 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
754 (cli->cl_max_rpcs_in_flight + 1);
755 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
757 oa->o_grant = cli->cl_avail_grant;
758 oa->o_dropped = cli->cl_lost_grant;
759 cli->cl_lost_grant = 0;
760 client_obd_list_unlock(&cli->cl_loi_list_lock);
761 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
762 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
765 /* caller must hold loi_list_lock */
766 static void osc_consume_write_grant(struct client_obd *cli,
767 struct brw_page *pga)
769 atomic_inc(&obd_dirty_pages);
770 cli->cl_dirty += CFS_PAGE_SIZE;
771 cli->cl_avail_grant -= CFS_PAGE_SIZE;
772 pga->flag |= OBD_BRW_FROM_GRANT;
773 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
774 CFS_PAGE_SIZE, pga, pga->pg);
775 LASSERT(cli->cl_avail_grant >= 0);
778 /* the companion to osc_consume_write_grant, called when a brw has completed.
779 * must be called with the loi lock held. */
780 static void osc_release_write_grant(struct client_obd *cli,
781 struct brw_page *pga, int sent)
783 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
786 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
791 pga->flag &= ~OBD_BRW_FROM_GRANT;
792 atomic_dec(&obd_dirty_pages);
793 cli->cl_dirty -= CFS_PAGE_SIZE;
795 cli->cl_lost_grant += CFS_PAGE_SIZE;
796 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
797 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
798 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
799 /* For short writes we shouldn't count parts of pages that
800 * span a whole block on the OST side, or our accounting goes
801 * wrong. Should match the code in filter_grant_check. */
802 int offset = pga->off & ~CFS_PAGE_MASK;
803 int count = pga->count + (offset & (blocksize - 1));
804 int end = (offset + pga->count) & (blocksize - 1);
806 count += blocksize - end;
808 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
809 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
810 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
811 cli->cl_avail_grant, cli->cl_dirty);
817 static unsigned long rpcs_in_flight(struct client_obd *cli)
819 return cli->cl_r_in_flight + cli->cl_w_in_flight;
822 /* caller must hold loi_list_lock */
823 void osc_wake_cache_waiters(struct client_obd *cli)
825 struct list_head *l, *tmp;
826 struct osc_cache_waiter *ocw;
829 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
830 /* if we can't dirty more, we must wait until some is written */
831 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
832 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
833 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
834 "osc max %ld, sys max %d\n", cli->cl_dirty,
835 cli->cl_dirty_max, obd_max_dirty_pages);
839 /* if still dirty cache but no grant wait for pending RPCs that
840 * may yet return us some grant before doing sync writes */
841 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
842 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
843 cli->cl_w_in_flight);
847 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
848 list_del_init(&ocw->ocw_entry);
849 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
850 /* no more RPCs in flight to return grant, do sync IO */
851 ocw->ocw_rc = -EDQUOT;
852 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
854 osc_consume_write_grant(cli,
855 &ocw->ocw_oap->oap_brw_page);
858 cfs_waitq_signal(&ocw->ocw_waitq);
864 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
866 client_obd_list_lock(&cli->cl_loi_list_lock);
867 cli->cl_avail_grant = ocd->ocd_grant;
868 client_obd_list_unlock(&cli->cl_loi_list_lock);
870 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
871 cli->cl_avail_grant, cli->cl_lost_grant);
872 LASSERT(cli->cl_avail_grant >= 0);
875 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
877 client_obd_list_lock(&cli->cl_loi_list_lock);
878 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
879 if (body->oa.o_valid & OBD_MD_FLGRANT)
880 cli->cl_avail_grant += body->oa.o_grant;
881 /* waiters are woken in brw_interpret_oap */
882 client_obd_list_unlock(&cli->cl_loi_list_lock);
885 /* We assume that the reason this OSC got a short read is because it read
886 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
887 * via the LOV, and it _knows_ it's reading inside the file, it's just that
888 * this stripe never got written at or beyond this stripe offset yet. */
889 static void handle_short_read(int nob_read, obd_count page_count,
890 struct brw_page **pga)
895 /* skip bytes read OK */
896 while (nob_read > 0) {
897 LASSERT (page_count > 0);
899 if (pga[i]->count > nob_read) {
900 /* EOF inside this page */
901 ptr = cfs_kmap(pga[i]->pg) +
902 (pga[i]->off & ~CFS_PAGE_MASK);
903 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
904 cfs_kunmap(pga[i]->pg);
910 nob_read -= pga[i]->count;
915 /* zero remaining pages */
916 while (page_count-- > 0) {
917 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
918 memset(ptr, 0, pga[i]->count);
919 cfs_kunmap(pga[i]->pg);
924 static int check_write_rcs(struct ptlrpc_request *req,
925 int requested_nob, int niocount,
926 obd_count page_count, struct brw_page **pga)
930 /* return error if any niobuf was in error */
931 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
932 sizeof(*remote_rcs) * niocount, NULL);
933 if (remote_rcs == NULL) {
934 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
937 if (lustre_msg_swabbed(req->rq_repmsg))
938 for (i = 0; i < niocount; i++)
939 __swab32s(&remote_rcs[i]);
941 for (i = 0; i < niocount; i++) {
942 if (remote_rcs[i] < 0)
943 return(remote_rcs[i]);
945 if (remote_rcs[i] != 0) {
946 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
947 i, remote_rcs[i], req);
952 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
953 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
954 requested_nob, req->rq_bulk->bd_nob_transferred);
961 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
963 if (p1->flag != p2->flag) {
964 unsigned mask = ~OBD_BRW_FROM_GRANT;
966 /* warn if we try to combine flags that we don't know to be
968 if ((p1->flag & mask) != (p2->flag & mask))
969 CERROR("is it ok to have flags 0x%x and 0x%x in the "
970 "same brw?\n", p1->flag, p2->flag);
974 return (p1->off + p1->count == p2->off);
977 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
978 struct brw_page **pga, int opc,
979 cksum_type_t cksum_type)
984 LASSERT (pg_count > 0);
985 cksum = init_checksum(cksum_type);
986 while (nob > 0 && pg_count > 0) {
987 unsigned char *ptr = cfs_kmap(pga[i]->pg);
988 int off = pga[i]->off & ~CFS_PAGE_MASK;
989 int count = pga[i]->count > nob ? nob : pga[i]->count;
991 /* corrupt the data before we compute the checksum, to
992 * simulate an OST->client data error */
993 if (i == 0 && opc == OST_READ &&
994 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
995 memcpy(ptr + off, "bad1", min(4, nob));
996 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
997 cfs_kunmap(pga[i]->pg);
998 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1001 nob -= pga[i]->count;
1005 /* For sending we only compute the wrong checksum instead
1006 * of corrupting the data so it is still correct on a redo */
1007 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1013 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1014 struct lov_stripe_md *lsm, obd_count page_count,
1015 struct brw_page **pga,
1016 struct ptlrpc_request **reqp,
1017 struct obd_capa *ocapa)
1019 struct ptlrpc_request *req;
1020 struct ptlrpc_bulk_desc *desc;
1021 struct ost_body *body;
1022 struct obd_ioobj *ioobj;
1023 struct niobuf_remote *niobuf;
1024 int niocount, i, requested_nob, opc, rc;
1025 struct osc_brw_async_args *aa;
1026 struct req_capsule *pill;
1029 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1030 RETURN(-ENOMEM); /* Recoverable */
1031 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1032 RETURN(-EINVAL); /* Fatal */
1034 if ((cmd & OBD_BRW_WRITE) != 0) {
1036 req = ptlrpc_request_alloc_pool(cli->cl_import,
1037 cli->cl_import->imp_rq_pool,
1041 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1047 for (niocount = i = 1; i < page_count; i++) {
1048 if (!can_merge_pages(pga[i - 1], pga[i]))
1052 pill = &req->rq_pill;
1053 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1054 niocount * sizeof(*niobuf));
1055 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1057 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1059 ptlrpc_request_free(req);
1062 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1064 if (opc == OST_WRITE)
1065 desc = ptlrpc_prep_bulk_imp(req, page_count,
1066 BULK_GET_SOURCE, OST_BULK_PORTAL);
1068 desc = ptlrpc_prep_bulk_imp(req, page_count,
1069 BULK_PUT_SINK, OST_BULK_PORTAL);
1072 GOTO(out, rc = -ENOMEM);
1073 /* NB request now owns desc and will free it when it gets freed */
1075 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1076 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1077 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1078 LASSERT(body && ioobj && niobuf);
1082 obdo_to_ioobj(oa, ioobj);
1083 ioobj->ioo_bufcnt = niocount;
1084 osc_pack_capa(req, body, ocapa);
1085 LASSERT (page_count > 0);
1086 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1087 struct brw_page *pg = pga[i];
1088 struct brw_page *pg_prev = pga[i - 1];
1090 LASSERT(pg->count > 0);
1091 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1092 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1093 pg->off, pg->count);
1095 LASSERTF(i == 0 || pg->off > pg_prev->off,
1096 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1097 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1099 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1100 pg_prev->pg, page_private(pg_prev->pg),
1101 pg_prev->pg->index, pg_prev->off);
1103 LASSERTF(i == 0 || pg->off > pg_prev->off,
1104 "i %d p_c %u\n", i, page_count);
1106 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1107 (pg->flag & OBD_BRW_SRVLOCK));
1109 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1111 requested_nob += pg->count;
1113 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1115 niobuf->len += pg->count;
1117 niobuf->offset = pg->off;
1118 niobuf->len = pg->count;
1119 niobuf->flags = pg->flag;
1123 LASSERT((void *)(niobuf - niocount) ==
1124 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1125 niocount * sizeof(*niobuf)));
1126 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1128 /* size[REQ_REC_OFF] still sizeof (*body) */
1129 if (opc == OST_WRITE) {
1130 if (unlikely(cli->cl_checksum) &&
1131 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1132 /* store cl_cksum_type in a local variable since
1133 * it can be changed via lprocfs */
1134 cksum_type_t cksum_type = cli->cl_cksum_type;
1136 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1137 oa->o_flags = body->oa.o_flags = 0;
1138 body->oa.o_flags |= cksum_type_pack(cksum_type);
1139 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1140 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1144 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1146 /* save this in 'oa', too, for later checking */
1147 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1148 oa->o_flags |= cksum_type_pack(cksum_type);
1150 /* clear out the checksum flag, in case this is a
1151 * resend but cl_checksum is no longer set. b=11238 */
1152 oa->o_valid &= ~OBD_MD_FLCKSUM;
1154 oa->o_cksum = body->oa.o_cksum;
1155 /* 1 RC per niobuf */
1156 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1157 sizeof(__u32) * niocount);
1159 if (unlikely(cli->cl_checksum) &&
1160 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1161 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1162 body->oa.o_flags = 0;
1163 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1164 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1166 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1167 /* 1 RC for the whole I/O */
1169 ptlrpc_request_set_replen(req);
1171 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1172 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1174 aa->aa_requested_nob = requested_nob;
1175 aa->aa_nio_count = niocount;
1176 aa->aa_page_count = page_count;
1180 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1186 ptlrpc_req_finished(req);
1190 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1191 __u32 client_cksum, __u32 server_cksum, int nob,
1192 obd_count page_count, struct brw_page **pga,
1193 cksum_type_t client_cksum_type)
1197 cksum_type_t cksum_type;
1199 if (server_cksum == client_cksum) {
1200 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1204 if (oa->o_valid & OBD_MD_FLFLAGS)
1205 cksum_type = cksum_type_unpack(oa->o_flags);
1207 cksum_type = OBD_CKSUM_CRC32;
1209 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1212 if (cksum_type != client_cksum_type)
1213 msg = "the server did not use the checksum type specified in "
1214 "the original request - likely a protocol problem";
1215 else if (new_cksum == server_cksum)
1216 msg = "changed on the client after we checksummed it - "
1217 "likely false positive due to mmap IO (bug 11742)";
1218 else if (new_cksum == client_cksum)
1219 msg = "changed in transit before arrival at OST";
1221 msg = "changed in transit AND doesn't match the original - "
1222 "likely false positive due to mmap IO (bug 11742)";
1224 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1225 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1226 "["LPU64"-"LPU64"]\n",
1227 msg, libcfs_nid2str(peer->nid),
1228 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1229 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1232 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1234 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1235 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1236 "client csum now %x\n", client_cksum, client_cksum_type,
1237 server_cksum, cksum_type, new_cksum);
1241 /* Note rc enters this function as number of bytes transferred */
1242 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1244 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1245 const lnet_process_id_t *peer =
1246 &req->rq_import->imp_connection->c_peer;
1247 struct client_obd *cli = aa->aa_cli;
1248 struct ost_body *body;
1249 __u32 client_cksum = 0;
1252 if (rc < 0 && rc != -EDQUOT)
1255 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1256 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1257 lustre_swab_ost_body);
1259 CDEBUG(D_INFO, "Can't unpack body\n");
1263 /* set/clear over quota flag for a uid/gid */
1264 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1265 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1266 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1267 body->oa.o_gid, body->oa.o_valid,
1273 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1274 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1276 osc_update_grant(cli, body);
1278 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1280 CERROR("Unexpected +ve rc %d\n", rc);
1283 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1285 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1286 check_write_checksum(&body->oa, peer, client_cksum,
1287 body->oa.o_cksum, aa->aa_requested_nob,
1288 aa->aa_page_count, aa->aa_ppga,
1289 cksum_type_unpack(aa->aa_oa->o_flags)))
1292 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1295 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1296 aa->aa_page_count, aa->aa_ppga);
1300 /* The rest of this function executes only for OST_READs */
1301 if (rc > aa->aa_requested_nob) {
1302 CERROR("Unexpected rc %d (%d requested)\n", rc,
1303 aa->aa_requested_nob);
1307 if (rc != req->rq_bulk->bd_nob_transferred) {
1308 CERROR ("Unexpected rc %d (%d transferred)\n",
1309 rc, req->rq_bulk->bd_nob_transferred);
1313 if (rc < aa->aa_requested_nob)
1314 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1316 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1318 GOTO(out, rc = -EAGAIN);
1320 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1321 static int cksum_counter;
1322 __u32 server_cksum = body->oa.o_cksum;
1325 cksum_type_t cksum_type;
1327 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1328 cksum_type = cksum_type_unpack(body->oa.o_flags);
1330 cksum_type = OBD_CKSUM_CRC32;
1331 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1332 aa->aa_ppga, OST_READ,
1335 if (peer->nid == req->rq_bulk->bd_sender) {
1339 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1342 if (server_cksum == ~0 && rc > 0) {
1343 CERROR("Protocol error: server %s set the 'checksum' "
1344 "bit, but didn't send a checksum. Not fatal, "
1345 "but please tell CFS.\n",
1346 libcfs_nid2str(peer->nid));
1347 } else if (server_cksum != client_cksum) {
1348 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1349 "%s%s%s inum "LPU64"/"LPU64" object "
1350 LPU64"/"LPU64" extent "
1351 "["LPU64"-"LPU64"]\n",
1352 req->rq_import->imp_obd->obd_name,
1353 libcfs_nid2str(peer->nid),
1355 body->oa.o_valid & OBD_MD_FLFID ?
1356 body->oa.o_fid : (__u64)0,
1357 body->oa.o_valid & OBD_MD_FLFID ?
1358 body->oa.o_generation :(__u64)0,
1360 body->oa.o_valid & OBD_MD_FLGROUP ?
1361 body->oa.o_gr : (__u64)0,
1362 aa->aa_ppga[0]->off,
1363 aa->aa_ppga[aa->aa_page_count-1]->off +
1364 aa->aa_ppga[aa->aa_page_count-1]->count -
1366 CERROR("client %x, server %x, cksum_type %x\n",
1367 client_cksum, server_cksum, cksum_type);
1369 aa->aa_oa->o_cksum = client_cksum;
1373 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1376 } else if (unlikely(client_cksum)) {
1377 static int cksum_missed;
1380 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1381 CERROR("Checksum %u requested from %s but not sent\n",
1382 cksum_missed, libcfs_nid2str(peer->nid));
1388 *aa->aa_oa = body->oa;
1393 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1394 struct lov_stripe_md *lsm,
1395 obd_count page_count, struct brw_page **pga,
1396 struct obd_capa *ocapa)
1398 struct ptlrpc_request *req;
1402 struct l_wait_info lwi;
1406 cfs_waitq_init(&waitq);
1409 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1410 page_count, pga, &req, ocapa);
1414 rc = ptlrpc_queue_wait(req);
1416 if (rc == -ETIMEDOUT && req->rq_resend) {
1417 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1418 ptlrpc_req_finished(req);
1422 rc = osc_brw_fini_request(req, rc);
1424 ptlrpc_req_finished(req);
1425 if (osc_recoverable_error(rc)) {
1427 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1428 CERROR("too many resend retries, returning error\n");
1432 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1433 l_wait_event(waitq, 0, &lwi);
1441 int osc_brw_redo_request(struct ptlrpc_request *request,
1442 struct osc_brw_async_args *aa)
1444 struct ptlrpc_request *new_req;
1445 struct ptlrpc_request_set *set = request->rq_set;
1446 struct osc_brw_async_args *new_aa;
1447 struct osc_async_page *oap;
1451 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1452 CERROR("too many resend retries, returning error\n");
1456 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1458 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1459 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1460 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1463 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1464 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1465 aa->aa_cli, aa->aa_oa,
1466 NULL /* lsm unused by osc currently */,
1467 aa->aa_page_count, aa->aa_ppga,
1468 &new_req, NULL /* ocapa */);
1472 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1474 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1475 if (oap->oap_request != NULL) {
1476 LASSERTF(request == oap->oap_request,
1477 "request %p != oap_request %p\n",
1478 request, oap->oap_request);
1479 if (oap->oap_interrupted) {
1480 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1481 ptlrpc_req_finished(new_req);
1486 /* New request takes over pga and oaps from old request.
1487 * Note that copying a list_head doesn't work, need to move it... */
1489 new_req->rq_interpret_reply = request->rq_interpret_reply;
1490 new_req->rq_async_args = request->rq_async_args;
1491 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1493 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1495 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1496 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1497 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1499 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1500 if (oap->oap_request) {
1501 ptlrpc_req_finished(oap->oap_request);
1502 oap->oap_request = ptlrpc_request_addref(new_req);
1506 /* use ptlrpc_set_add_req is safe because interpret functions work
1507 * in check_set context. only one way exist with access to request
1508 * from different thread got -EINTR - this way protected with
1509 * cl_loi_list_lock */
1510 ptlrpc_set_add_req(set, new_req);
1512 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1514 DEBUG_REQ(D_INFO, new_req, "new request");
1518 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1520 struct osc_brw_async_args *aa = data;
1524 rc = osc_brw_fini_request(req, rc);
1525 if (osc_recoverable_error(rc)) {
1526 rc = osc_brw_redo_request(req, aa);
1531 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1532 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1533 aa->aa_cli->cl_w_in_flight--;
1535 aa->aa_cli->cl_r_in_flight--;
1536 for (i = 0; i < aa->aa_page_count; i++)
1537 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1538 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1540 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1545 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1546 struct lov_stripe_md *lsm, obd_count page_count,
1547 struct brw_page **pga, struct ptlrpc_request_set *set,
1548 struct obd_capa *ocapa)
1550 struct ptlrpc_request *req;
1551 struct client_obd *cli = &exp->exp_obd->u.cli;
1553 struct osc_brw_async_args *aa;
1556 /* Consume write credits even if doing a sync write -
1557 * otherwise we may run out of space on OST due to grant. */
1558 if (cmd == OBD_BRW_WRITE) {
1559 spin_lock(&cli->cl_loi_list_lock);
1560 for (i = 0; i < page_count; i++) {
1561 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1562 osc_consume_write_grant(cli, pga[i]);
1564 spin_unlock(&cli->cl_loi_list_lock);
1567 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1570 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1571 if (cmd == OBD_BRW_READ) {
1572 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1573 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1574 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1576 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1577 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1578 cli->cl_w_in_flight);
1579 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1583 req->rq_interpret_reply = brw_interpret;
1584 ptlrpc_set_add_req(set, req);
1585 client_obd_list_lock(&cli->cl_loi_list_lock);
1586 if (cmd == OBD_BRW_READ)
1587 cli->cl_r_in_flight++;
1589 cli->cl_w_in_flight++;
1590 client_obd_list_unlock(&cli->cl_loi_list_lock);
1591 } else if (cmd == OBD_BRW_WRITE) {
1592 client_obd_list_lock(&cli->cl_loi_list_lock);
1593 for (i = 0; i < page_count; i++)
1594 osc_release_write_grant(cli, pga[i], 0);
1595 client_obd_list_unlock(&cli->cl_loi_list_lock);
1601 * ugh, we want disk allocation on the target to happen in offset order. we'll
1602 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1603 * fine for our small page arrays and doesn't require allocation. its an
1604 * insertion sort that swaps elements that are strides apart, shrinking the
1605 * stride down until its '1' and the array is sorted.
1607 static void sort_brw_pages(struct brw_page **array, int num)
1610 struct brw_page *tmp;
1614 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1619 for (i = stride ; i < num ; i++) {
1622 while (j >= stride && array[j - stride]->off > tmp->off) {
1623 array[j] = array[j - stride];
1628 } while (stride > 1);
1631 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1637 LASSERT (pages > 0);
1638 offset = pg[i]->off & ~CFS_PAGE_MASK;
1642 if (pages == 0) /* that's all */
1645 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1646 return count; /* doesn't end on page boundary */
1649 offset = pg[i]->off & ~CFS_PAGE_MASK;
1650 if (offset != 0) /* doesn't start on page boundary */
1657 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1659 struct brw_page **ppga;
1662 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1666 for (i = 0; i < count; i++)
1671 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1673 LASSERT(ppga != NULL);
1674 OBD_FREE(ppga, sizeof(*ppga) * count);
1677 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1678 obd_count page_count, struct brw_page *pga,
1679 struct obd_trans_info *oti)
1681 struct obdo *saved_oa = NULL;
1682 struct brw_page **ppga, **orig;
1683 struct obd_import *imp = class_exp2cliimp(exp);
1684 struct client_obd *cli = &imp->imp_obd->u.cli;
1685 int rc, page_count_orig;
1688 if (cmd & OBD_BRW_CHECK) {
1689 /* The caller just wants to know if there's a chance that this
1690 * I/O can succeed */
1692 if (imp == NULL || imp->imp_invalid)
1697 /* test_brw with a failed create can trip this, maybe others. */
1698 LASSERT(cli->cl_max_pages_per_rpc);
1702 orig = ppga = osc_build_ppga(pga, page_count);
1705 page_count_orig = page_count;
1707 sort_brw_pages(ppga, page_count);
1708 while (page_count) {
1709 obd_count pages_per_brw;
1711 if (page_count > cli->cl_max_pages_per_rpc)
1712 pages_per_brw = cli->cl_max_pages_per_rpc;
1714 pages_per_brw = page_count;
1716 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1718 if (saved_oa != NULL) {
1719 /* restore previously saved oa */
1720 *oinfo->oi_oa = *saved_oa;
1721 } else if (page_count > pages_per_brw) {
1722 /* save a copy of oa (brw will clobber it) */
1723 OBDO_ALLOC(saved_oa);
1724 if (saved_oa == NULL)
1725 GOTO(out, rc = -ENOMEM);
1726 *saved_oa = *oinfo->oi_oa;
1729 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1730 pages_per_brw, ppga, oinfo->oi_capa);
1735 page_count -= pages_per_brw;
1736 ppga += pages_per_brw;
1740 osc_release_ppga(orig, page_count_orig);
1742 if (saved_oa != NULL)
1743 OBDO_FREE(saved_oa);
1748 static int osc_brw_async(int cmd, struct obd_export *exp,
1749 struct obd_info *oinfo, obd_count page_count,
1750 struct brw_page *pga, struct obd_trans_info *oti,
1751 struct ptlrpc_request_set *set)
1753 struct brw_page **ppga, **orig;
1754 struct client_obd *cli = &exp->exp_obd->u.cli;
1755 int page_count_orig;
1759 if (cmd & OBD_BRW_CHECK) {
1760 struct obd_import *imp = class_exp2cliimp(exp);
1761 /* The caller just wants to know if there's a chance that this
1762 * I/O can succeed */
1764 if (imp == NULL || imp->imp_invalid)
1769 orig = ppga = osc_build_ppga(pga, page_count);
1772 page_count_orig = page_count;
1774 sort_brw_pages(ppga, page_count);
1775 while (page_count) {
1776 struct brw_page **copy;
1777 obd_count pages_per_brw;
1779 pages_per_brw = min_t(obd_count, page_count,
1780 cli->cl_max_pages_per_rpc);
1782 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1784 /* use ppga only if single RPC is going to fly */
1785 if (pages_per_brw != page_count_orig || ppga != orig) {
1786 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1788 GOTO(out, rc = -ENOMEM);
1789 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1793 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1794 pages_per_brw, copy, set, oinfo->oi_capa);
1798 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1802 /* we passed it to async_internal() which is
1803 * now responsible for releasing memory */
1807 page_count -= pages_per_brw;
1808 ppga += pages_per_brw;
1812 osc_release_ppga(orig, page_count_orig);
1816 static void osc_check_rpcs(struct client_obd *cli);
1818 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1819 * the dirty accounting. Writeback completes or truncate happens before
1820 * writing starts. Must be called with the loi lock held. */
1821 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1824 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1828 /* This maintains the lists of pending pages to read/write for a given object
1829 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1830 * to quickly find objects that are ready to send an RPC. */
1831 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1837 if (lop->lop_num_pending == 0)
1840 /* if we have an invalid import we want to drain the queued pages
1841 * by forcing them through rpcs that immediately fail and complete
1842 * the pages. recovery relies on this to empty the queued pages
1843 * before canceling the locks and evicting down the llite pages */
1844 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1847 /* stream rpcs in queue order as long as as there is an urgent page
1848 * queued. this is our cheap solution for good batching in the case
1849 * where writepage marks some random page in the middle of the file
1850 * as urgent because of, say, memory pressure */
1851 if (!list_empty(&lop->lop_urgent)) {
1852 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1855 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1856 optimal = cli->cl_max_pages_per_rpc;
1857 if (cmd & OBD_BRW_WRITE) {
1858 /* trigger a write rpc stream as long as there are dirtiers
1859 * waiting for space. as they're waiting, they're not going to
1860 * create more pages to coallesce with what's waiting.. */
1861 if (!list_empty(&cli->cl_cache_waiters)) {
1862 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1865 /* +16 to avoid triggering rpcs that would want to include pages
1866 * that are being queued but which can't be made ready until
1867 * the queuer finishes with the page. this is a wart for
1868 * llite::commit_write() */
1871 if (lop->lop_num_pending >= optimal)
1877 static void on_list(struct list_head *item, struct list_head *list,
1880 if (list_empty(item) && should_be_on)
1881 list_add_tail(item, list);
1882 else if (!list_empty(item) && !should_be_on)
1883 list_del_init(item);
1886 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1887 * can find pages to build into rpcs quickly */
1888 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1890 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1891 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1892 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1894 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1895 loi->loi_write_lop.lop_num_pending);
1897 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1898 loi->loi_read_lop.lop_num_pending);
1901 static void lop_update_pending(struct client_obd *cli,
1902 struct loi_oap_pages *lop, int cmd, int delta)
1904 lop->lop_num_pending += delta;
1905 if (cmd & OBD_BRW_WRITE)
1906 cli->cl_pending_w_pages += delta;
1908 cli->cl_pending_r_pages += delta;
1911 /* this is called when a sync waiter receives an interruption. Its job is to
1912 * get the caller woken as soon as possible. If its page hasn't been put in an
1913 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1914 * desiring interruption which will forcefully complete the rpc once the rpc
1916 static void osc_occ_interrupted(struct oig_callback_context *occ)
1918 struct osc_async_page *oap;
1919 struct loi_oap_pages *lop;
1920 struct lov_oinfo *loi;
1923 /* XXX member_of() */
1924 oap = list_entry(occ, struct osc_async_page, oap_occ);
1926 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1928 oap->oap_interrupted = 1;
1930 /* ok, it's been put in an rpc. only one oap gets a request reference */
1931 if (oap->oap_request != NULL) {
1932 ptlrpc_mark_interrupted(oap->oap_request);
1933 ptlrpcd_wake(oap->oap_request);
1937 /* we don't get interruption callbacks until osc_trigger_group_io()
1938 * has been called and put the sync oaps in the pending/urgent lists.*/
1939 if (!list_empty(&oap->oap_pending_item)) {
1940 list_del_init(&oap->oap_pending_item);
1941 list_del_init(&oap->oap_urgent_item);
1944 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1945 &loi->loi_write_lop : &loi->loi_read_lop;
1946 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1947 loi_list_maint(oap->oap_cli, oap->oap_loi);
1949 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1950 oap->oap_oig = NULL;
1954 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1957 /* this is trying to propogate async writeback errors back up to the
1958 * application. As an async write fails we record the error code for later if
1959 * the app does an fsync. As long as errors persist we force future rpcs to be
1960 * sync so that the app can get a sync error and break the cycle of queueing
1961 * pages for which writeback will fail. */
1962 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1969 ar->ar_force_sync = 1;
1970 ar->ar_min_xid = ptlrpc_sample_next_xid();
1975 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1976 ar->ar_force_sync = 0;
1979 static void osc_oap_to_pending(struct osc_async_page *oap)
1981 struct loi_oap_pages *lop;
1983 if (oap->oap_cmd & OBD_BRW_WRITE)
1984 lop = &oap->oap_loi->loi_write_lop;
1986 lop = &oap->oap_loi->loi_read_lop;
1988 if (oap->oap_async_flags & ASYNC_URGENT)
1989 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1990 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1991 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1994 /* this must be called holding the loi list lock to give coverage to exit_cache,
1995 * async_flag maintenance, and oap_request */
1996 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1997 struct osc_async_page *oap, int sent, int rc)
2002 if (oap->oap_request != NULL) {
2003 xid = ptlrpc_req_xid(oap->oap_request);
2004 ptlrpc_req_finished(oap->oap_request);
2005 oap->oap_request = NULL;
2008 oap->oap_async_flags = 0;
2009 oap->oap_interrupted = 0;
2011 if (oap->oap_cmd & OBD_BRW_WRITE) {
2012 osc_process_ar(&cli->cl_ar, xid, rc);
2013 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2016 if (rc == 0 && oa != NULL) {
2017 if (oa->o_valid & OBD_MD_FLBLOCKS)
2018 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2019 if (oa->o_valid & OBD_MD_FLMTIME)
2020 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2021 if (oa->o_valid & OBD_MD_FLATIME)
2022 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2023 if (oa->o_valid & OBD_MD_FLCTIME)
2024 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2028 osc_exit_cache(cli, oap, sent);
2029 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2030 oap->oap_oig = NULL;
2035 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2036 oap->oap_cmd, oa, rc);
2038 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2039 * I/O on the page could start, but OSC calls it under lock
2040 * and thus we can add oap back to pending safely */
2042 /* upper layer wants to leave the page on pending queue */
2043 osc_oap_to_pending(oap);
2045 osc_exit_cache(cli, oap, sent);
2049 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2051 struct osc_async_page *oap, *tmp;
2052 struct osc_brw_async_args *aa = data;
2053 struct client_obd *cli;
2056 rc = osc_brw_fini_request(req, rc);
2057 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2058 if (osc_recoverable_error(rc)) {
2059 rc = osc_brw_redo_request(req, aa);
2066 client_obd_list_lock(&cli->cl_loi_list_lock);
2068 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2069 * is called so we know whether to go to sync BRWs or wait for more
2070 * RPCs to complete */
2071 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2072 cli->cl_w_in_flight--;
2074 cli->cl_r_in_flight--;
2076 /* the caller may re-use the oap after the completion call so
2077 * we need to clean it up a little */
2078 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2079 list_del_init(&oap->oap_rpc_item);
2080 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2083 osc_wake_cache_waiters(cli);
2084 osc_check_rpcs(cli);
2086 client_obd_list_unlock(&cli->cl_loi_list_lock);
2088 OBDO_FREE(aa->aa_oa);
2090 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2094 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2095 struct list_head *rpc_list,
2096 int page_count, int cmd)
2098 struct ptlrpc_request *req;
2099 struct brw_page **pga = NULL;
2100 struct osc_brw_async_args *aa;
2101 struct obdo *oa = NULL;
2102 struct obd_async_page_ops *ops = NULL;
2103 void *caller_data = NULL;
2104 struct obd_capa *ocapa;
2105 struct osc_async_page *oap;
2109 LASSERT(!list_empty(rpc_list));
2111 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2113 RETURN(ERR_PTR(-ENOMEM));
2117 GOTO(out, req = ERR_PTR(-ENOMEM));
2120 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2122 ops = oap->oap_caller_ops;
2123 caller_data = oap->oap_caller_data;
2125 pga[i] = &oap->oap_brw_page;
2126 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2128 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2132 /* always get the data for the obdo for the rpc */
2133 LASSERT(ops != NULL);
2134 ops->ap_fill_obdo(caller_data, cmd, oa);
2135 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2137 sort_brw_pages(pga, page_count);
2138 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2142 CERROR("prep_req failed: %d\n", rc);
2143 GOTO(out, req = ERR_PTR(rc));
2146 /* Need to update the timestamps after the request is built in case
2147 * we race with setattr (locally or in queue at OST). If OST gets
2148 * later setattr before earlier BRW (as determined by the request xid),
2149 * the OST will not use BRW timestamps. Sadly, there is no obvious
2150 * way to do this in a single call. bug 10150 */
2151 ops->ap_update_obdo(caller_data, cmd, oa,
2152 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2154 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2155 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2156 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2157 list_splice(rpc_list, &aa->aa_oaps);
2158 CFS_INIT_LIST_HEAD(rpc_list);
2165 OBD_FREE(pga, sizeof(*pga) * page_count);
2170 /* the loi lock is held across this function but it's allowed to release
2171 * and reacquire it during its work */
2172 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2173 int cmd, struct loi_oap_pages *lop)
2175 struct ptlrpc_request *req;
2176 obd_count page_count = 0;
2177 struct osc_async_page *oap = NULL, *tmp;
2178 struct osc_brw_async_args *aa;
2179 struct obd_async_page_ops *ops;
2180 CFS_LIST_HEAD(rpc_list);
2181 unsigned int ending_offset;
2182 unsigned starting_offset = 0;
2186 /* first we find the pages we're allowed to work with */
2187 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2189 ops = oap->oap_caller_ops;
2191 LASSERT(oap->oap_magic == OAP_MAGIC);
2193 if (page_count != 0 &&
2194 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2195 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2196 " oap %p, page %p, srvlock %u\n",
2197 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2200 /* in llite being 'ready' equates to the page being locked
2201 * until completion unlocks it. commit_write submits a page
2202 * as not ready because its unlock will happen unconditionally
2203 * as the call returns. if we race with commit_write giving
2204 * us that page we dont' want to create a hole in the page
2205 * stream, so we stop and leave the rpc to be fired by
2206 * another dirtier or kupdated interval (the not ready page
2207 * will still be on the dirty list). we could call in
2208 * at the end of ll_file_write to process the queue again. */
2209 if (!(oap->oap_async_flags & ASYNC_READY)) {
2210 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2212 CDEBUG(D_INODE, "oap %p page %p returned %d "
2213 "instead of ready\n", oap,
2217 /* llite is telling us that the page is still
2218 * in commit_write and that we should try
2219 * and put it in an rpc again later. we
2220 * break out of the loop so we don't create
2221 * a hole in the sequence of pages in the rpc
2226 /* the io isn't needed.. tell the checks
2227 * below to complete the rpc with EINTR */
2228 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2229 oap->oap_count = -EINTR;
2232 oap->oap_async_flags |= ASYNC_READY;
2235 LASSERTF(0, "oap %p page %p returned %d "
2236 "from make_ready\n", oap,
2244 * Page submitted for IO has to be locked. Either by
2245 * ->ap_make_ready() or by higher layers.
2247 * XXX nikita: this assertion should be adjusted when lustre
2248 * starts using PG_writeback for pages being written out.
2250 #if defined(__KERNEL__) && defined(__linux__)
2251 LASSERT(PageLocked(oap->oap_page));
2253 /* If there is a gap at the start of this page, it can't merge
2254 * with any previous page, so we'll hand the network a
2255 * "fragmented" page array that it can't transfer in 1 RDMA */
2256 if (page_count != 0 && oap->oap_page_off != 0)
2259 /* take the page out of our book-keeping */
2260 list_del_init(&oap->oap_pending_item);
2261 lop_update_pending(cli, lop, cmd, -1);
2262 list_del_init(&oap->oap_urgent_item);
2264 if (page_count == 0)
2265 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2266 (PTLRPC_MAX_BRW_SIZE - 1);
2268 /* ask the caller for the size of the io as the rpc leaves. */
2269 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2271 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2272 if (oap->oap_count <= 0) {
2273 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2275 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2279 /* now put the page back in our accounting */
2280 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2281 if (page_count == 0)
2282 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2283 if (++page_count >= cli->cl_max_pages_per_rpc)
2286 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2287 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2288 * have the same alignment as the initial writes that allocated
2289 * extents on the server. */
2290 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2291 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2292 if (ending_offset == 0)
2295 /* If there is a gap at the end of this page, it can't merge
2296 * with any subsequent pages, so we'll hand the network a
2297 * "fragmented" page array that it can't transfer in 1 RDMA */
2298 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2302 osc_wake_cache_waiters(cli);
2304 if (page_count == 0)
2307 loi_list_maint(cli, loi);
2309 client_obd_list_unlock(&cli->cl_loi_list_lock);
2311 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2313 /* this should happen rarely and is pretty bad, it makes the
2314 * pending list not follow the dirty order */
2315 client_obd_list_lock(&cli->cl_loi_list_lock);
2316 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2317 list_del_init(&oap->oap_rpc_item);
2319 /* queued sync pages can be torn down while the pages
2320 * were between the pending list and the rpc */
2321 if (oap->oap_interrupted) {
2322 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2323 osc_ap_completion(cli, NULL, oap, 0,
2327 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2329 loi_list_maint(cli, loi);
2330 RETURN(PTR_ERR(req));
2333 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2335 if (cmd == OBD_BRW_READ) {
2336 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2337 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2338 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2339 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2340 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2342 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2343 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2344 cli->cl_w_in_flight);
2345 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2346 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2347 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2350 client_obd_list_lock(&cli->cl_loi_list_lock);
2352 if (cmd == OBD_BRW_READ)
2353 cli->cl_r_in_flight++;
2355 cli->cl_w_in_flight++;
2357 /* queued sync pages can be torn down while the pages
2358 * were between the pending list and the rpc */
2360 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2361 /* only one oap gets a request reference */
2364 if (oap->oap_interrupted && !req->rq_intr) {
2365 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2367 ptlrpc_mark_interrupted(req);
2371 tmp->oap_request = ptlrpc_request_addref(req);
2373 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2374 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2376 req->rq_interpret_reply = brw_interpret_oap;
2377 ptlrpcd_add_req(req);
2381 #define LOI_DEBUG(LOI, STR, args...) \
2382 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2383 !list_empty(&(LOI)->loi_cli_item), \
2384 (LOI)->loi_write_lop.lop_num_pending, \
2385 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2386 (LOI)->loi_read_lop.lop_num_pending, \
2387 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2390 /* This is called by osc_check_rpcs() to find which objects have pages that
2391 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2392 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2395 /* first return all objects which we already know to have
2396 * pages ready to be stuffed into rpcs */
2397 if (!list_empty(&cli->cl_loi_ready_list))
2398 RETURN(list_entry(cli->cl_loi_ready_list.next,
2399 struct lov_oinfo, loi_cli_item));
2401 /* then if we have cache waiters, return all objects with queued
2402 * writes. This is especially important when many small files
2403 * have filled up the cache and not been fired into rpcs because
2404 * they don't pass the nr_pending/object threshhold */
2405 if (!list_empty(&cli->cl_cache_waiters) &&
2406 !list_empty(&cli->cl_loi_write_list))
2407 RETURN(list_entry(cli->cl_loi_write_list.next,
2408 struct lov_oinfo, loi_write_item));
2410 /* then return all queued objects when we have an invalid import
2411 * so that they get flushed */
2412 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2413 if (!list_empty(&cli->cl_loi_write_list))
2414 RETURN(list_entry(cli->cl_loi_write_list.next,
2415 struct lov_oinfo, loi_write_item));
2416 if (!list_empty(&cli->cl_loi_read_list))
2417 RETURN(list_entry(cli->cl_loi_read_list.next,
2418 struct lov_oinfo, loi_read_item));
2423 /* called with the loi list lock held */
2424 static void osc_check_rpcs(struct client_obd *cli)
2426 struct lov_oinfo *loi;
2427 int rc = 0, race_counter = 0;
2430 while ((loi = osc_next_loi(cli)) != NULL) {
2431 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2433 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2436 /* attempt some read/write balancing by alternating between
2437 * reads and writes in an object. The makes_rpc checks here
2438 * would be redundant if we were getting read/write work items
2439 * instead of objects. we don't want send_oap_rpc to drain a
2440 * partial read pending queue when we're given this object to
2441 * do io on writes while there are cache waiters */
2442 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2443 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2444 &loi->loi_write_lop);
2452 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2453 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2454 &loi->loi_read_lop);
2463 /* attempt some inter-object balancing by issueing rpcs
2464 * for each object in turn */
2465 if (!list_empty(&loi->loi_cli_item))
2466 list_del_init(&loi->loi_cli_item);
2467 if (!list_empty(&loi->loi_write_item))
2468 list_del_init(&loi->loi_write_item);
2469 if (!list_empty(&loi->loi_read_item))
2470 list_del_init(&loi->loi_read_item);
2472 loi_list_maint(cli, loi);
2474 /* send_oap_rpc fails with 0 when make_ready tells it to
2475 * back off. llite's make_ready does this when it tries
2476 * to lock a page queued for write that is already locked.
2477 * we want to try sending rpcs from many objects, but we
2478 * don't want to spin failing with 0. */
2479 if (race_counter == 10)
2485 /* we're trying to queue a page in the osc so we're subject to the
2486 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2487 * If the osc's queued pages are already at that limit, then we want to sleep
2488 * until there is space in the osc's queue for us. We also may be waiting for
2489 * write credits from the OST if there are RPCs in flight that may return some
2490 * before we fall back to sync writes.
2492 * We need this know our allocation was granted in the presence of signals */
2493 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2497 client_obd_list_lock(&cli->cl_loi_list_lock);
2498 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2499 client_obd_list_unlock(&cli->cl_loi_list_lock);
2503 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2504 * grant or cache space. */
2505 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2506 struct osc_async_page *oap)
2508 struct osc_cache_waiter ocw;
2509 struct l_wait_info lwi = { 0 };
2513 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2514 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2515 cli->cl_dirty_max, obd_max_dirty_pages,
2516 cli->cl_lost_grant, cli->cl_avail_grant);
2518 /* force the caller to try sync io. this can jump the list
2519 * of queued writes and create a discontiguous rpc stream */
2520 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2521 loi->loi_ar.ar_force_sync)
2524 /* Hopefully normal case - cache space and write credits available */
2525 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2526 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2527 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2528 /* account for ourselves */
2529 osc_consume_write_grant(cli, &oap->oap_brw_page);
2533 /* Make sure that there are write rpcs in flight to wait for. This
2534 * is a little silly as this object may not have any pending but
2535 * other objects sure might. */
2536 if (cli->cl_w_in_flight) {
2537 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2538 cfs_waitq_init(&ocw.ocw_waitq);
2542 loi_list_maint(cli, loi);
2543 osc_check_rpcs(cli);
2544 client_obd_list_unlock(&cli->cl_loi_list_lock);
2546 CDEBUG(D_CACHE, "sleeping for cache space\n");
2547 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2549 client_obd_list_lock(&cli->cl_loi_list_lock);
2550 if (!list_empty(&ocw.ocw_entry)) {
2551 list_del(&ocw.ocw_entry);
2560 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2561 struct lov_oinfo *loi, cfs_page_t *page,
2562 obd_off offset, struct obd_async_page_ops *ops,
2563 void *data, void **res)
2565 struct osc_async_page *oap;
2569 return size_round(sizeof(*oap));
2572 oap->oap_magic = OAP_MAGIC;
2573 oap->oap_cli = &exp->exp_obd->u.cli;
2576 oap->oap_caller_ops = ops;
2577 oap->oap_caller_data = data;
2579 oap->oap_page = page;
2580 oap->oap_obj_off = offset;
2582 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2583 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2584 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2586 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2588 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2592 struct osc_async_page *oap_from_cookie(void *cookie)
2594 struct osc_async_page *oap = cookie;
2595 if (oap->oap_magic != OAP_MAGIC)
2596 return ERR_PTR(-EINVAL);
2600 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2601 struct lov_oinfo *loi, void *cookie,
2602 int cmd, obd_off off, int count,
2603 obd_flag brw_flags, enum async_flags async_flags)
2605 struct client_obd *cli = &exp->exp_obd->u.cli;
2606 struct osc_async_page *oap;
2610 oap = oap_from_cookie(cookie);
2612 RETURN(PTR_ERR(oap));
2614 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2617 if (!list_empty(&oap->oap_pending_item) ||
2618 !list_empty(&oap->oap_urgent_item) ||
2619 !list_empty(&oap->oap_rpc_item))
2622 /* check if the file's owner/group is over quota */
2623 #ifdef HAVE_QUOTA_SUPPORT
2624 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2625 struct obd_async_page_ops *ops;
2632 ops = oap->oap_caller_ops;
2633 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2634 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2645 loi = lsm->lsm_oinfo[0];
2647 client_obd_list_lock(&cli->cl_loi_list_lock);
2650 oap->oap_page_off = off;
2651 oap->oap_count = count;
2652 oap->oap_brw_flags = brw_flags;
2653 oap->oap_async_flags = async_flags;
2655 if (cmd & OBD_BRW_WRITE) {
2656 rc = osc_enter_cache(cli, loi, oap);
2658 client_obd_list_unlock(&cli->cl_loi_list_lock);
2663 osc_oap_to_pending(oap);
2664 loi_list_maint(cli, loi);
2666 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2669 osc_check_rpcs(cli);
2670 client_obd_list_unlock(&cli->cl_loi_list_lock);
2675 /* aka (~was & now & flag), but this is more clear :) */
2676 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2678 static int osc_set_async_flags(struct obd_export *exp,
2679 struct lov_stripe_md *lsm,
2680 struct lov_oinfo *loi, void *cookie,
2681 obd_flag async_flags)
2683 struct client_obd *cli = &exp->exp_obd->u.cli;
2684 struct loi_oap_pages *lop;
2685 struct osc_async_page *oap;
2689 oap = oap_from_cookie(cookie);
2691 RETURN(PTR_ERR(oap));
2694 * bug 7311: OST-side locking is only supported for liblustre for now
2695 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2696 * implementation has to handle case where OST-locked page was picked
2697 * up by, e.g., ->writepage().
2699 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2700 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2703 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2707 loi = lsm->lsm_oinfo[0];
2709 if (oap->oap_cmd & OBD_BRW_WRITE) {
2710 lop = &loi->loi_write_lop;
2712 lop = &loi->loi_read_lop;
2715 client_obd_list_lock(&cli->cl_loi_list_lock);
2717 if (list_empty(&oap->oap_pending_item))
2718 GOTO(out, rc = -EINVAL);
2720 if ((oap->oap_async_flags & async_flags) == async_flags)
2723 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2724 oap->oap_async_flags |= ASYNC_READY;
2726 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2727 if (list_empty(&oap->oap_rpc_item)) {
2728 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2729 loi_list_maint(cli, loi);
2733 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2734 oap->oap_async_flags);
2736 osc_check_rpcs(cli);
2737 client_obd_list_unlock(&cli->cl_loi_list_lock);
2741 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2742 struct lov_oinfo *loi,
2743 struct obd_io_group *oig, void *cookie,
2744 int cmd, obd_off off, int count,
2746 obd_flag async_flags)
2748 struct client_obd *cli = &exp->exp_obd->u.cli;
2749 struct osc_async_page *oap;
2750 struct loi_oap_pages *lop;
2754 oap = oap_from_cookie(cookie);
2756 RETURN(PTR_ERR(oap));
2758 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2761 if (!list_empty(&oap->oap_pending_item) ||
2762 !list_empty(&oap->oap_urgent_item) ||
2763 !list_empty(&oap->oap_rpc_item))
2767 loi = lsm->lsm_oinfo[0];
2769 client_obd_list_lock(&cli->cl_loi_list_lock);
2772 oap->oap_page_off = off;
2773 oap->oap_count = count;
2774 oap->oap_brw_flags = brw_flags;
2775 oap->oap_async_flags = async_flags;
2777 if (cmd & OBD_BRW_WRITE)
2778 lop = &loi->loi_write_lop;
2780 lop = &loi->loi_read_lop;
2782 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2783 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2785 rc = oig_add_one(oig, &oap->oap_occ);
2788 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2789 oap, oap->oap_page, rc);
2791 client_obd_list_unlock(&cli->cl_loi_list_lock);
2796 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2797 struct loi_oap_pages *lop, int cmd)
2799 struct list_head *pos, *tmp;
2800 struct osc_async_page *oap;
2802 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2803 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2804 list_del(&oap->oap_pending_item);
2805 osc_oap_to_pending(oap);
2807 loi_list_maint(cli, loi);
2810 static int osc_trigger_group_io(struct obd_export *exp,
2811 struct lov_stripe_md *lsm,
2812 struct lov_oinfo *loi,
2813 struct obd_io_group *oig)
2815 struct client_obd *cli = &exp->exp_obd->u.cli;
2819 loi = lsm->lsm_oinfo[0];
2821 client_obd_list_lock(&cli->cl_loi_list_lock);
2823 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2824 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2826 osc_check_rpcs(cli);
2827 client_obd_list_unlock(&cli->cl_loi_list_lock);
2832 static int osc_teardown_async_page(struct obd_export *exp,
2833 struct lov_stripe_md *lsm,
2834 struct lov_oinfo *loi, void *cookie)
2836 struct client_obd *cli = &exp->exp_obd->u.cli;
2837 struct loi_oap_pages *lop;
2838 struct osc_async_page *oap;
2842 oap = oap_from_cookie(cookie);
2844 RETURN(PTR_ERR(oap));
2847 loi = lsm->lsm_oinfo[0];
2849 if (oap->oap_cmd & OBD_BRW_WRITE) {
2850 lop = &loi->loi_write_lop;
2852 lop = &loi->loi_read_lop;
2855 client_obd_list_lock(&cli->cl_loi_list_lock);
2857 if (!list_empty(&oap->oap_rpc_item))
2858 GOTO(out, rc = -EBUSY);
2860 osc_exit_cache(cli, oap, 0);
2861 osc_wake_cache_waiters(cli);
2863 if (!list_empty(&oap->oap_urgent_item)) {
2864 list_del_init(&oap->oap_urgent_item);
2865 oap->oap_async_flags &= ~ASYNC_URGENT;
2867 if (!list_empty(&oap->oap_pending_item)) {
2868 list_del_init(&oap->oap_pending_item);
2869 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2871 loi_list_maint(cli, loi);
2873 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2875 client_obd_list_unlock(&cli->cl_loi_list_lock);
2879 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2882 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2885 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2888 lock_res_and_lock(lock);
2889 #if defined (__KERNEL__) && defined (__linux__)
2890 /* Liang XXX: Darwin and Winnt checking should be added */
2891 if (lock->l_ast_data && lock->l_ast_data != data) {
2892 struct inode *new_inode = data;
2893 struct inode *old_inode = lock->l_ast_data;
2894 if (!(old_inode->i_state & I_FREEING))
2895 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2896 LASSERTF(old_inode->i_state & I_FREEING,
2897 "Found existing inode %p/%lu/%u state %lu in lock: "
2898 "setting data to %p/%lu/%u\n", old_inode,
2899 old_inode->i_ino, old_inode->i_generation,
2901 new_inode, new_inode->i_ino, new_inode->i_generation);
2904 lock->l_ast_data = data;
2905 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2906 unlock_res_and_lock(lock);
2907 LDLM_LOCK_PUT(lock);
2910 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2911 ldlm_iterator_t replace, void *data)
2913 struct ldlm_res_id res_id = { .name = {0} };
2914 struct obd_device *obd = class_exp2obd(exp);
2916 res_id.name[0] = lsm->lsm_object_id;
2917 res_id.name[2] = lsm->lsm_object_gr;
2919 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2923 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2929 /* The request was created before ldlm_cli_enqueue call. */
2930 if (rc == ELDLM_LOCK_ABORTED) {
2931 struct ldlm_reply *rep;
2932 rep = req_capsule_server_get(&req->rq_pill,
2935 LASSERT(rep != NULL);
2936 if (rep->lock_policy_res1)
2937 rc = rep->lock_policy_res1;
2941 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2942 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2943 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2944 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2945 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2948 /* Call the update callback. */
2949 rc = oinfo->oi_cb_up(oinfo, rc);
2953 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2954 struct osc_enqueue_args *aa, int rc)
2956 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2957 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2958 struct ldlm_lock *lock;
2960 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2962 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2964 /* Complete obtaining the lock procedure. */
2965 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2967 &aa->oa_oi->oi_flags,
2968 &lsm->lsm_oinfo[0]->loi_lvb,
2969 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2970 lustre_swab_ost_lvb,
2971 aa->oa_oi->oi_lockh, rc);
2973 /* Complete osc stuff. */
2974 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2976 /* Release the lock for async request. */
2977 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2978 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2980 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2981 aa->oa_oi->oi_lockh, req, aa);
2982 LDLM_LOCK_PUT(lock);
2986 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2987 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2988 * other synchronous requests, however keeping some locks and trying to obtain
2989 * others may take a considerable amount of time in a case of ost failure; and
2990 * when other sync requests do not get released lock from a client, the client
2991 * is excluded from the cluster -- such scenarious make the life difficult, so
2992 * release locks just after they are obtained. */
2993 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2994 struct ldlm_enqueue_info *einfo,
2995 struct ptlrpc_request_set *rqset)
2997 struct ldlm_res_id res_id = { .name = {0} };
2998 struct obd_device *obd = exp->exp_obd;
2999 struct ptlrpc_request *req = NULL;
3000 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3005 res_id.name[0] = oinfo->oi_md->lsm_object_id;
3006 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
3008 /* Filesystem lock extents are extended to page boundaries so that
3009 * dealing with the page cache is a little smoother. */
3010 oinfo->oi_policy.l_extent.start -=
3011 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3012 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3014 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3017 /* Next, search for already existing extent locks that will cover us */
3018 /* If we're trying to read, we also search for an existing PW lock. The
3019 * VFS and page cache already protect us locally, so lots of readers/
3020 * writers can share a single PW lock.
3022 * There are problems with conversion deadlocks, so instead of
3023 * converting a read lock to a write lock, we'll just enqueue a new
3026 * At some point we should cancel the read lock instead of making them
3027 * send us a blocking callback, but there are problems with canceling
3028 * locks out from other users right now, too. */
3029 mode = einfo->ei_mode;
3030 if (einfo->ei_mode == LCK_PR)
3032 mode = ldlm_lock_match(obd->obd_namespace,
3033 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3034 einfo->ei_type, &oinfo->oi_policy, mode,
3037 /* addref the lock only if not async requests and PW lock is
3038 * matched whereas we asked for PR. */
3039 if (!rqset && einfo->ei_mode != mode)
3040 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3041 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3044 /* I would like to be able to ASSERT here that rss <=
3045 * kms, but I can't, for reasons which are explained in
3049 /* We already have a lock, and it's referenced */
3050 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3052 /* For async requests, decref the lock. */
3053 if (einfo->ei_mode != mode)
3054 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3056 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3063 CFS_LIST_HEAD(cancels);
3064 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3065 &RQF_LDLM_ENQUEUE_LVB);
3069 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3073 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3074 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3075 ptlrpc_request_set_replen(req);
3078 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3079 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3081 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3082 &oinfo->oi_policy, &oinfo->oi_flags,
3083 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3084 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3085 lustre_swab_ost_lvb, oinfo->oi_lockh,
3089 struct osc_enqueue_args *aa;
3090 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3091 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3096 req->rq_interpret_reply = osc_enqueue_interpret;
3097 ptlrpc_set_add_req(rqset, req);
3098 } else if (intent) {
3099 ptlrpc_req_finished(req);
3104 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3106 ptlrpc_req_finished(req);
3111 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3112 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3113 int *flags, void *data, struct lustre_handle *lockh)
3115 struct ldlm_res_id res_id = { .name = {0} };
3116 struct obd_device *obd = exp->exp_obd;
3117 int lflags = *flags;
3121 res_id.name[0] = lsm->lsm_object_id;
3122 res_id.name[2] = lsm->lsm_object_gr;
3124 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3127 /* Filesystem lock extents are extended to page boundaries so that
3128 * dealing with the page cache is a little smoother */
3129 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3130 policy->l_extent.end |= ~CFS_PAGE_MASK;
3132 /* Next, search for already existing extent locks that will cover us */
3133 /* If we're trying to read, we also search for an existing PW lock. The
3134 * VFS and page cache already protect us locally, so lots of readers/
3135 * writers can share a single PW lock. */
3139 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3140 &res_id, type, policy, rc, lockh);
3142 osc_set_data_with_check(lockh, data, lflags);
3143 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3144 ldlm_lock_addref(lockh, LCK_PR);
3145 ldlm_lock_decref(lockh, LCK_PW);
3152 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3153 __u32 mode, struct lustre_handle *lockh)
3157 if (unlikely(mode == LCK_GROUP))
3158 ldlm_lock_decref_and_cancel(lockh, mode);
3160 ldlm_lock_decref(lockh, mode);
3165 static int osc_cancel_unused(struct obd_export *exp,
3166 struct lov_stripe_md *lsm, int flags,
3169 struct obd_device *obd = class_exp2obd(exp);
3170 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3173 res_id.name[0] = lsm->lsm_object_id;
3174 res_id.name[2] = lsm->lsm_object_gr;
3178 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3181 static int osc_join_lru(struct obd_export *exp,
3182 struct lov_stripe_md *lsm, int join)
3184 struct obd_device *obd = class_exp2obd(exp);
3185 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3188 res_id.name[0] = lsm->lsm_object_id;
3189 res_id.name[2] = lsm->lsm_object_gr;
3193 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3196 static int osc_statfs_interpret(struct ptlrpc_request *req,
3197 struct osc_async_args *aa, int rc)
3199 struct obd_statfs *msfs;
3205 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3207 GOTO(out, rc = -EPROTO);
3210 *aa->aa_oi->oi_osfs = *msfs;
3212 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3216 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3217 __u64 max_age, struct ptlrpc_request_set *rqset)
3219 struct ptlrpc_request *req;
3220 struct osc_async_args *aa;
3224 /* We could possibly pass max_age in the request (as an absolute
3225 * timestamp or a "seconds.usec ago") so the target can avoid doing
3226 * extra calls into the filesystem if that isn't necessary (e.g.
3227 * during mount that would help a bit). Having relative timestamps
3228 * is not so great if request processing is slow, while absolute
3229 * timestamps are not ideal because they need time synchronization. */
3230 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3234 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3236 ptlrpc_request_free(req);
3239 ptlrpc_request_set_replen(req);
3240 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3241 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3242 /* procfs requests not want stat in wait for avoid deadlock */
3243 req->rq_no_resend = 1;
3244 req->rq_no_delay = 1;
3247 req->rq_interpret_reply = osc_statfs_interpret;
3248 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3249 aa = (struct osc_async_args *)&req->rq_async_args;
3252 ptlrpc_set_add_req(rqset, req);
3256 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3257 __u64 max_age, __u32 flags)
3259 struct obd_statfs *msfs;
3260 struct ptlrpc_request *req;
3264 /* We could possibly pass max_age in the request (as an absolute
3265 * timestamp or a "seconds.usec ago") so the target can avoid doing
3266 * extra calls into the filesystem if that isn't necessary (e.g.
3267 * during mount that would help a bit). Having relative timestamps
3268 * is not so great if request processing is slow, while absolute
3269 * timestamps are not ideal because they need time synchronization. */
3270 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3274 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3276 ptlrpc_request_free(req);
3279 ptlrpc_request_set_replen(req);
3280 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3282 if (flags & OBD_STATFS_NODELAY) {
3283 /* procfs requests not want stat in wait for avoid deadlock */
3284 req->rq_no_resend = 1;
3285 req->rq_no_delay = 1;
3288 rc = ptlrpc_queue_wait(req);
3292 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3294 GOTO(out, rc = -EPROTO);
3301 ptlrpc_req_finished(req);
3305 /* Retrieve object striping information.
3307 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3308 * the maximum number of OST indices which will fit in the user buffer.
3309 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3311 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3313 struct lov_user_md lum, *lumk;
3314 int rc = 0, lum_size;
3320 if (copy_from_user(&lum, lump, sizeof(lum)))
3323 if (lum.lmm_magic != LOV_USER_MAGIC)
3326 if (lum.lmm_stripe_count > 0) {
3327 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3328 OBD_ALLOC(lumk, lum_size);
3332 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3333 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3335 lum_size = sizeof(lum);
3339 lumk->lmm_object_id = lsm->lsm_object_id;
3340 lumk->lmm_object_gr = lsm->lsm_object_gr;
3341 lumk->lmm_stripe_count = 1;
3343 if (copy_to_user(lump, lumk, lum_size))
3347 OBD_FREE(lumk, lum_size);
3353 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3354 void *karg, void *uarg)
3356 struct obd_device *obd = exp->exp_obd;
3357 struct obd_ioctl_data *data = karg;
3361 if (!try_module_get(THIS_MODULE)) {
3362 CERROR("Can't get module. Is it alive?");
3366 case OBD_IOC_LOV_GET_CONFIG: {
3368 struct lov_desc *desc;
3369 struct obd_uuid uuid;
3373 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3374 GOTO(out, err = -EINVAL);
3376 data = (struct obd_ioctl_data *)buf;
3378 if (sizeof(*desc) > data->ioc_inllen1) {
3379 obd_ioctl_freedata(buf, len);
3380 GOTO(out, err = -EINVAL);
3383 if (data->ioc_inllen2 < sizeof(uuid)) {
3384 obd_ioctl_freedata(buf, len);
3385 GOTO(out, err = -EINVAL);
3388 desc = (struct lov_desc *)data->ioc_inlbuf1;
3389 desc->ld_tgt_count = 1;
3390 desc->ld_active_tgt_count = 1;
3391 desc->ld_default_stripe_count = 1;
3392 desc->ld_default_stripe_size = 0;
3393 desc->ld_default_stripe_offset = 0;
3394 desc->ld_pattern = 0;
3395 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3397 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3399 err = copy_to_user((void *)uarg, buf, len);
3402 obd_ioctl_freedata(buf, len);
3405 case LL_IOC_LOV_SETSTRIPE:
3406 err = obd_alloc_memmd(exp, karg);
3410 case LL_IOC_LOV_GETSTRIPE:
3411 err = osc_getstripe(karg, uarg);
3413 case OBD_IOC_CLIENT_RECOVER:
3414 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3419 case IOC_OSC_SET_ACTIVE:
3420 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3423 case OBD_IOC_POLL_QUOTACHECK:
3424 err = lquota_poll_check(quota_interface, exp,
3425 (struct if_quotacheck *)karg);
3428 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3429 cmd, cfs_curproc_comm());
3430 GOTO(out, err = -ENOTTY);
3433 module_put(THIS_MODULE);
3437 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3438 void *key, __u32 *vallen, void *val)
3441 if (!vallen || !val)
3444 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3445 __u32 *stripe = val;
3446 *vallen = sizeof(*stripe);
3449 } else if (KEY_IS(KEY_LAST_ID)) {
3450 struct ptlrpc_request *req;
3455 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3456 &RQF_OST_GET_INFO_LAST_ID);
3460 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3461 RCL_CLIENT, keylen);
3462 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3464 ptlrpc_request_free(req);
3468 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3469 memcpy(tmp, key, keylen);
3471 ptlrpc_request_set_replen(req);
3472 rc = ptlrpc_queue_wait(req);
3476 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3478 GOTO(out, rc = -EPROTO);
3480 *((obd_id *)val) = *reply;
3482 ptlrpc_req_finished(req);
3488 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3491 struct llog_ctxt *ctxt;
3492 struct obd_import *imp = req->rq_import;
3498 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3501 rc = llog_initiator_connect(ctxt);
3503 CERROR("cannot establish connection for "
3504 "ctxt %p: %d\n", ctxt, rc);
3507 llog_ctxt_put(ctxt);
3508 spin_lock(&imp->imp_lock);
3509 imp->imp_server_timeout = 1;
3510 imp->imp_pingable = 1;
3511 spin_unlock(&imp->imp_lock);
3512 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3517 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3518 void *key, obd_count vallen, void *val,
3519 struct ptlrpc_request_set *set)
3521 struct ptlrpc_request *req;
3522 struct obd_device *obd = exp->exp_obd;
3523 struct obd_import *imp = class_exp2cliimp(exp);
3528 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3530 if (KEY_IS(KEY_NEXT_ID)) {
3531 if (vallen != sizeof(obd_id))
3535 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3536 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3537 exp->exp_obd->obd_name,
3538 obd->u.cli.cl_oscc.oscc_next_id);
3543 if (KEY_IS("unlinked")) {
3544 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3545 spin_lock(&oscc->oscc_lock);
3546 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3547 spin_unlock(&oscc->oscc_lock);
3551 if (KEY_IS(KEY_INIT_RECOV)) {
3552 if (vallen != sizeof(int))
3554 spin_lock(&imp->imp_lock);
3555 imp->imp_initial_recov = *(int *)val;
3556 spin_unlock(&imp->imp_lock);
3557 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3558 exp->exp_obd->obd_name,
3559 imp->imp_initial_recov);
3563 if (KEY_IS("checksum")) {
3564 if (vallen != sizeof(int))
3566 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3570 if (KEY_IS(KEY_FLUSH_CTX)) {
3571 sptlrpc_import_flush_my_ctx(imp);
3578 /* We pass all other commands directly to OST. Since nobody calls osc
3579 methods directly and everybody is supposed to go through LOV, we
3580 assume lov checked invalid values for us.
3581 The only recognised values so far are evict_by_nid and mds_conn.
3582 Even if something bad goes through, we'd get a -EINVAL from OST
3586 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3590 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3591 RCL_CLIENT, keylen);
3592 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3593 RCL_CLIENT, vallen);
3594 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3596 ptlrpc_request_free(req);
3600 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3601 memcpy(tmp, key, keylen);
3602 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3603 memcpy(tmp, val, vallen);
3605 if (KEY_IS(KEY_MDS_CONN)) {
3606 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3608 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3609 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3610 LASSERT(oscc->oscc_oa.o_gr > 0);
3611 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3614 ptlrpc_request_set_replen(req);
3615 ptlrpc_set_add_req(set, req);
3616 ptlrpc_check_set(set);
3622 static struct llog_operations osc_size_repl_logops = {
3623 lop_cancel: llog_obd_repl_cancel
3626 static struct llog_operations osc_mds_ost_orig_logops;
3627 static int osc_llog_init(struct obd_device *obd, int group,
3628 struct obd_device *tgt, int count,
3629 struct llog_catid *catid, struct obd_uuid *uuid)
3633 LASSERT(group == OBD_LLOG_GROUP);
3634 spin_lock(&obd->obd_dev_lock);
3635 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3636 osc_mds_ost_orig_logops = llog_lvfs_ops;
3637 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3638 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3639 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3640 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3642 spin_unlock(&obd->obd_dev_lock);
3644 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3645 &catid->lci_logid, &osc_mds_ost_orig_logops);
3647 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3651 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3652 NULL, &osc_size_repl_logops);
3654 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3657 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3658 obd->obd_name, tgt->obd_name, count, catid, rc);
3659 CERROR("logid "LPX64":0x%x\n",
3660 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3665 static int osc_llog_finish(struct obd_device *obd, int count)
3667 struct llog_ctxt *ctxt;
3668 int rc = 0, rc2 = 0;
3671 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3673 rc = llog_cleanup(ctxt);
3675 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3677 rc2 = llog_cleanup(ctxt);
3684 static int osc_reconnect(const struct lu_env *env,
3685 struct obd_export *exp, struct obd_device *obd,
3686 struct obd_uuid *cluuid,
3687 struct obd_connect_data *data)
3689 struct client_obd *cli = &obd->u.cli;
3691 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3694 client_obd_list_lock(&cli->cl_loi_list_lock);
3695 data->ocd_grant = cli->cl_avail_grant ?:
3696 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3697 lost_grant = cli->cl_lost_grant;
3698 cli->cl_lost_grant = 0;
3699 client_obd_list_unlock(&cli->cl_loi_list_lock);
3701 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3702 "cl_lost_grant: %ld\n", data->ocd_grant,
3703 cli->cl_avail_grant, lost_grant);
3704 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3705 " ocd_grant: %d\n", data->ocd_connect_flags,
3706 data->ocd_version, data->ocd_grant);
3712 static int osc_disconnect(struct obd_export *exp)
3714 struct obd_device *obd = class_exp2obd(exp);
3715 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3718 if (obd->u.cli.cl_conn_count == 1)
3719 /* flush any remaining cancel messages out to the target */
3720 llog_sync(ctxt, exp);
3722 llog_ctxt_put(ctxt);
3724 rc = client_disconnect_export(exp);
3728 static int osc_import_event(struct obd_device *obd,
3729 struct obd_import *imp,
3730 enum obd_import_event event)
3732 struct client_obd *cli;
3736 LASSERT(imp->imp_obd == obd);
3739 case IMP_EVENT_DISCON: {
3740 /* Only do this on the MDS OSC's */
3741 if (imp->imp_server_timeout) {
3742 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3744 spin_lock(&oscc->oscc_lock);
3745 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3746 spin_unlock(&oscc->oscc_lock);
3749 client_obd_list_lock(&cli->cl_loi_list_lock);
3750 cli->cl_avail_grant = 0;
3751 cli->cl_lost_grant = 0;
3752 client_obd_list_unlock(&cli->cl_loi_list_lock);
3755 case IMP_EVENT_INACTIVE: {
3756 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3759 case IMP_EVENT_INVALIDATE: {
3760 struct ldlm_namespace *ns = obd->obd_namespace;
3764 client_obd_list_lock(&cli->cl_loi_list_lock);
3765 /* all pages go to failing rpcs due to the invalid import */
3766 osc_check_rpcs(cli);
3767 client_obd_list_unlock(&cli->cl_loi_list_lock);
3769 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3773 case IMP_EVENT_ACTIVE: {
3774 /* Only do this on the MDS OSC's */
3775 if (imp->imp_server_timeout) {
3776 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3778 spin_lock(&oscc->oscc_lock);
3779 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3780 spin_unlock(&oscc->oscc_lock);
3782 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3785 case IMP_EVENT_OCD: {
3786 struct obd_connect_data *ocd = &imp->imp_connect_data;
3788 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3789 osc_init_grant(&obd->u.cli, ocd);
3792 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3793 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3795 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3799 CERROR("Unknown import event %d\n", event);
3805 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3811 rc = ptlrpcd_addref();
3815 rc = client_obd_setup(obd, lcfg);
3819 struct lprocfs_static_vars lvars = { 0 };
3820 struct client_obd *cli = &obd->u.cli;
3822 lprocfs_osc_init_vars(&lvars);
3823 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3824 lproc_osc_attach_seqstat(obd);
3825 sptlrpc_lprocfs_cliobd_attach(obd);
3826 ptlrpc_lprocfs_register_obd(obd);
3830 /* We need to allocate a few requests more, because
3831 brw_interpret_oap tries to create new requests before freeing
3832 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3833 reserved, but I afraid that might be too much wasted RAM
3834 in fact, so 2 is just my guess and still should work. */
3835 cli->cl_import->imp_rq_pool =
3836 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3838 ptlrpc_add_rqs_to_pool);
3844 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3850 case OBD_CLEANUP_EARLY: {
3851 struct obd_import *imp;
3852 imp = obd->u.cli.cl_import;
3853 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3854 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3855 ptlrpc_deactivate_import(imp);
3856 spin_lock(&imp->imp_lock);
3857 imp->imp_pingable = 0;
3858 spin_unlock(&imp->imp_lock);
3861 case OBD_CLEANUP_EXPORTS: {
3862 /* If we set up but never connected, the
3863 client import will not have been cleaned. */
3864 if (obd->u.cli.cl_import) {
3865 struct obd_import *imp;
3866 imp = obd->u.cli.cl_import;
3867 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3869 ptlrpc_invalidate_import(imp);
3870 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3871 class_destroy_import(imp);
3872 obd->u.cli.cl_import = NULL;
3876 case OBD_CLEANUP_SELF_EXP:
3877 rc = obd_llog_finish(obd, 0);
3879 CERROR("failed to cleanup llogging subsystems\n");
3881 case OBD_CLEANUP_OBD:
3887 int osc_cleanup(struct obd_device *obd)
3889 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3893 ptlrpc_lprocfs_unregister_obd(obd);
3894 lprocfs_obd_cleanup(obd);
3896 spin_lock(&oscc->oscc_lock);
3897 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3898 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3899 spin_unlock(&oscc->oscc_lock);
3901 /* free memory of osc quota cache */
3902 lquota_cleanup(quota_interface, obd);
3904 rc = client_obd_cleanup(obd);
3910 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3912 struct lustre_cfg *lcfg = buf;
3913 struct lprocfs_static_vars lvars = { 0 };
3916 lprocfs_osc_init_vars(&lvars);
3918 switch (lcfg->lcfg_command) {
3919 case LCFG_SPTLRPC_CONF:
3920 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3923 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3931 struct obd_ops osc_obd_ops = {
3932 .o_owner = THIS_MODULE,
3933 .o_setup = osc_setup,
3934 .o_precleanup = osc_precleanup,
3935 .o_cleanup = osc_cleanup,
3936 .o_add_conn = client_import_add_conn,
3937 .o_del_conn = client_import_del_conn,
3938 .o_connect = client_connect_import,
3939 .o_reconnect = osc_reconnect,
3940 .o_disconnect = osc_disconnect,
3941 .o_statfs = osc_statfs,
3942 .o_statfs_async = osc_statfs_async,
3943 .o_packmd = osc_packmd,
3944 .o_unpackmd = osc_unpackmd,
3945 .o_precreate = osc_precreate,
3946 .o_create = osc_create,
3947 .o_destroy = osc_destroy,
3948 .o_getattr = osc_getattr,
3949 .o_getattr_async = osc_getattr_async,
3950 .o_setattr = osc_setattr,
3951 .o_setattr_async = osc_setattr_async,
3953 .o_brw_async = osc_brw_async,
3954 .o_prep_async_page = osc_prep_async_page,
3955 .o_queue_async_io = osc_queue_async_io,
3956 .o_set_async_flags = osc_set_async_flags,
3957 .o_queue_group_io = osc_queue_group_io,
3958 .o_trigger_group_io = osc_trigger_group_io,
3959 .o_teardown_async_page = osc_teardown_async_page,
3960 .o_punch = osc_punch,
3962 .o_enqueue = osc_enqueue,
3963 .o_match = osc_match,
3964 .o_change_cbdata = osc_change_cbdata,
3965 .o_cancel = osc_cancel,
3966 .o_cancel_unused = osc_cancel_unused,
3967 .o_join_lru = osc_join_lru,
3968 .o_iocontrol = osc_iocontrol,
3969 .o_get_info = osc_get_info,
3970 .o_set_info_async = osc_set_info_async,
3971 .o_import_event = osc_import_event,
3972 .o_llog_init = osc_llog_init,
3973 .o_llog_finish = osc_llog_finish,
3974 .o_process_config = osc_process_config,
3976 int __init osc_init(void)
3978 struct lprocfs_static_vars lvars = { 0 };
3982 lprocfs_osc_init_vars(&lvars);
3984 request_module("lquota");
3985 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3986 lquota_init(quota_interface);
3987 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3989 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3990 LUSTRE_OSC_NAME, NULL);
3992 if (quota_interface)
3993 PORTAL_SYMBOL_PUT(osc_quota_interface);
4001 static void /*__exit*/ osc_exit(void)
4003 lquota_exit(quota_interface);
4004 if (quota_interface)
4005 PORTAL_SYMBOL_PUT(osc_quota_interface);
4007 class_unregister_type(LUSTRE_OSC_NAME);
4010 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4011 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4012 MODULE_LICENSE("GPL");
4014 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);