1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
144 /* XXX zero *lsmp? */
145 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147 LASSERT((*lsmp)->lsm_object_id);
148 LASSERT((*lsmp)->lsm_object_gr);
151 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157 struct ost_body *body, void *capa)
159 struct obd_capa *oc = (struct obd_capa *)capa;
160 struct lustre_capa *c;
165 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
168 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169 DEBUG_CAPA(D_SEC, c, "pack");
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173 struct obd_info *oinfo)
175 struct ost_body *body;
177 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178 body->oa = *oinfo->oi_oa;
179 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183 struct osc_async_args *aa, int rc)
185 struct ost_body *body;
191 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192 lustre_swab_ost_body);
194 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
197 /* This should really be sent by the OST */
198 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
201 CERROR("can't unpack ost_body\n");
203 aa->aa_oi->oi_oa->o_valid = 0;
206 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211 struct ptlrpc_request_set *set)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216 struct osc_async_args *aa;
219 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221 OST_GETATTR, 3, size,NULL);
225 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
227 ptlrpc_req_set_repsize(req, 2, size);
228 req->rq_interpret_reply = osc_getattr_interpret;
230 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231 aa = (struct osc_async_args *)&req->rq_async_args;
234 ptlrpc_set_add_req(set, req);
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
240 struct ptlrpc_request *req;
241 struct ost_body *body;
242 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
245 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247 OST_GETATTR, 3, size, NULL);
251 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
253 ptlrpc_req_set_repsize(req, 2, size);
255 rc = ptlrpc_queue_wait(req);
257 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262 lustre_swab_ost_body);
264 CERROR ("can't unpack ost_body\n");
265 GOTO (out, rc = -EPROTO);
268 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269 *oinfo->oi_oa = body->oa;
271 /* This should really be sent by the OST */
272 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277 ptlrpc_req_finished(req);
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282 struct obd_trans_info *oti)
284 struct ptlrpc_request *req;
285 struct ost_body *body;
286 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
289 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290 oinfo->oi_oa->o_gr > 0);
291 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293 OST_SETATTR, 3, size, NULL);
297 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
299 ptlrpc_req_set_repsize(req, 2, size);
301 rc = ptlrpc_queue_wait(req);
305 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306 lustre_swab_ost_body);
308 GOTO(out, rc = -EPROTO);
310 *oinfo->oi_oa = body->oa;
314 ptlrpc_req_finished(req);
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319 struct osc_async_args *aa, int rc)
321 struct ost_body *body;
327 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328 lustre_swab_ost_body);
330 CERROR("can't unpack ost_body\n");
331 GOTO(out, rc = -EPROTO);
334 *aa->aa_oi->oi_oa = body->oa;
336 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341 struct obd_trans_info *oti,
342 struct ptlrpc_request_set *rqset)
344 struct ptlrpc_request *req;
345 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346 struct osc_async_args *aa;
349 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351 OST_SETATTR, 3, size, NULL);
355 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
358 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
361 ptlrpc_req_set_repsize(req, 2, size);
362 /* do mds to ost setattr asynchronouly */
364 /* Do not wait for response. */
365 ptlrpcd_add_req(req);
367 req->rq_interpret_reply = osc_setattr_interpret;
369 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370 aa = (struct osc_async_args *)&req->rq_async_args;
373 ptlrpc_set_add_req(rqset, req);
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380 struct lov_stripe_md **ea, struct obd_trans_info *oti)
382 struct ptlrpc_request *req;
383 struct ost_body *body;
384 struct lov_stripe_md *lsm;
385 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
393 rc = obd_alloc_memmd(exp, &lsm);
398 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399 OST_CREATE, 2, size, NULL);
401 GOTO(out, rc = -ENOMEM);
403 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
406 ptlrpc_req_set_repsize(req, 2, size);
407 if (oa->o_valid & OBD_MD_FLINLINE) {
408 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409 oa->o_flags == OBD_FL_DELORPHAN);
411 "delorphan from OST integration");
412 /* Don't resend the delorphan req */
413 req->rq_no_resend = req->rq_no_delay = 1;
416 rc = ptlrpc_queue_wait(req);
420 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421 lustre_swab_ost_body);
423 CERROR ("can't unpack ost_body\n");
424 GOTO (out_req, rc = -EPROTO);
429 /* This should really be sent by the OST */
430 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431 oa->o_valid |= OBD_MD_FLBLKSZ;
433 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434 * have valid lsm_oinfo data structs, so don't go touching that.
435 * This needs to be fixed in a big way.
437 lsm->lsm_object_id = oa->o_id;
438 lsm->lsm_object_gr = oa->o_gr;
442 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
444 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445 if (!oti->oti_logcookies)
446 oti_alloc_cookies(oti, 1);
447 *oti->oti_logcookies = *obdo_logcookie(oa);
451 CDEBUG(D_HA, "transno: "LPD64"\n",
452 lustre_msg_get_transno(req->rq_repmsg));
454 ptlrpc_req_finished(req);
457 obd_free_memmd(exp, &lsm);
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462 struct osc_async_args *aa, int rc)
464 struct ost_body *body;
470 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471 lustre_swab_ost_body);
473 CERROR ("can't unpack ost_body\n");
474 GOTO(out, rc = -EPROTO);
477 *aa->aa_oi->oi_oa = body->oa;
479 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484 struct obd_trans_info *oti,
485 struct ptlrpc_request_set *rqset)
487 struct ptlrpc_request *req;
488 struct osc_async_args *aa;
489 struct ost_body *body;
490 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
498 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500 OST_PUNCH, 3, size, NULL);
504 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
506 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
507 /* overload the size and blocks fields in the oa with start/end */
508 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
509 body->oa.o_size = oinfo->oi_policy.l_extent.start;
510 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
511 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
513 ptlrpc_req_set_repsize(req, 2, size);
515 req->rq_interpret_reply = osc_punch_interpret;
516 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
517 aa = (struct osc_async_args *)&req->rq_async_args;
519 ptlrpc_set_add_req(rqset, req);
524 static int osc_sync(struct obd_export *exp, struct obdo *oa,
525 struct lov_stripe_md *md, obd_size start, obd_size end,
528 struct ptlrpc_request *req;
529 struct ost_body *body;
530 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
538 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
540 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
541 OST_SYNC, 3, size, NULL);
545 /* overload the size and blocks fields in the oa with start/end */
546 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
548 body->oa.o_size = start;
549 body->oa.o_blocks = end;
550 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
552 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
554 ptlrpc_req_set_repsize(req, 2, size);
556 rc = ptlrpc_queue_wait(req);
560 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
561 lustre_swab_ost_body);
563 CERROR ("can't unpack ost_body\n");
564 GOTO (out, rc = -EPROTO);
571 ptlrpc_req_finished(req);
575 /* Find and cancel locally locks matched by @mode in the resource found by
576 * @objid. Found locks are added into @cancel list. Returns the amount of
577 * locks added to @cancels list. */
578 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
579 struct list_head *cancels, ldlm_mode_t mode,
582 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
583 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
584 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
591 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
592 lock_flags, 0, NULL);
593 ldlm_resource_putref(res);
597 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
600 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
602 atomic_dec(&cli->cl_destroy_in_flight);
603 cfs_waitq_signal(&cli->cl_destroy_waitq);
607 static int osc_can_send_destroy(struct client_obd *cli)
609 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
610 cli->cl_max_rpcs_in_flight) {
611 /* The destroy request can be sent */
614 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
615 cli->cl_max_rpcs_in_flight) {
617 * The counter has been modified between the two atomic
620 cfs_waitq_signal(&cli->cl_destroy_waitq);
625 /* Destroy requests can be async always on the client, and we don't even really
626 * care about the return code since the client cannot do anything at all about
628 * When the MDS is unlinking a filename, it saves the file objects into a
629 * recovery llog, and these object records are cancelled when the OST reports
630 * they were destroyed and sync'd to disk (i.e. transaction committed).
631 * If the client dies, or the OST is down when the object should be destroyed,
632 * the records are not cancelled, and when the OST reconnects to the MDS next,
633 * it will retrieve the llog unlink logs and then sends the log cancellation
634 * cookies to the MDS after committing destroy transactions. */
635 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
636 struct lov_stripe_md *ea, struct obd_trans_info *oti,
637 struct obd_export *md_export)
639 CFS_LIST_HEAD(cancels);
640 struct ptlrpc_request *req;
641 struct ost_body *body;
642 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
643 int count, bufcount = 2;
644 struct client_obd *cli = &exp->exp_obd->u.cli;
652 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
653 LDLM_FL_DISCARD_DATA);
654 if (exp_connect_cancelset(exp) && count)
656 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
657 size, REQ_REC_OFF + 1, 0, &cancels, count);
661 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
662 req->rq_interpret_reply = osc_destroy_interpret;
664 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
665 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
666 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
667 sizeof(*oti->oti_logcookies));
670 ptlrpc_req_set_repsize(req, 2, size);
672 if (!osc_can_send_destroy(cli)) {
673 struct l_wait_info lwi = { 0 };
676 * Wait until the number of on-going destroy RPCs drops
677 * under max_rpc_in_flight
679 l_wait_event_exclusive(cli->cl_destroy_waitq,
680 osc_can_send_destroy(cli), &lwi);
683 /* Do not wait for response */
684 ptlrpcd_add_req(req);
688 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
691 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
693 LASSERT(!(oa->o_valid & bits));
696 client_obd_list_lock(&cli->cl_loi_list_lock);
697 oa->o_dirty = cli->cl_dirty;
698 if (cli->cl_dirty > cli->cl_dirty_max) {
699 CERROR("dirty %lu > dirty_max %lu\n",
700 cli->cl_dirty, cli->cl_dirty_max);
702 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
703 CERROR("dirty %d > system dirty_max %d\n",
704 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
706 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
707 CERROR("dirty %lu - dirty_max %lu too big???\n",
708 cli->cl_dirty, cli->cl_dirty_max);
711 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
712 (cli->cl_max_rpcs_in_flight + 1);
713 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
715 oa->o_grant = cli->cl_avail_grant;
716 oa->o_dropped = cli->cl_lost_grant;
717 cli->cl_lost_grant = 0;
718 client_obd_list_unlock(&cli->cl_loi_list_lock);
719 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
720 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
723 /* caller must hold loi_list_lock */
724 static void osc_consume_write_grant(struct client_obd *cli,
725 struct brw_page *pga)
727 atomic_inc(&obd_dirty_pages);
728 cli->cl_dirty += CFS_PAGE_SIZE;
729 cli->cl_avail_grant -= CFS_PAGE_SIZE;
730 pga->flag |= OBD_BRW_FROM_GRANT;
731 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
732 CFS_PAGE_SIZE, pga, pga->pg);
733 LASSERT(cli->cl_avail_grant >= 0);
736 /* the companion to osc_consume_write_grant, called when a brw has completed.
737 * must be called with the loi lock held. */
738 static void osc_release_write_grant(struct client_obd *cli,
739 struct brw_page *pga, int sent)
741 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
744 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
749 pga->flag &= ~OBD_BRW_FROM_GRANT;
750 atomic_dec(&obd_dirty_pages);
751 cli->cl_dirty -= CFS_PAGE_SIZE;
753 cli->cl_lost_grant += CFS_PAGE_SIZE;
754 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
755 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
756 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
757 /* For short writes we shouldn't count parts of pages that
758 * span a whole block on the OST side, or our accounting goes
759 * wrong. Should match the code in filter_grant_check. */
760 int offset = pga->off & ~CFS_PAGE_MASK;
761 int count = pga->count + (offset & (blocksize - 1));
762 int end = (offset + pga->count) & (blocksize - 1);
764 count += blocksize - end;
766 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
767 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
768 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
769 cli->cl_avail_grant, cli->cl_dirty);
775 static unsigned long rpcs_in_flight(struct client_obd *cli)
777 return cli->cl_r_in_flight + cli->cl_w_in_flight;
780 /* caller must hold loi_list_lock */
781 void osc_wake_cache_waiters(struct client_obd *cli)
783 struct list_head *l, *tmp;
784 struct osc_cache_waiter *ocw;
787 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
788 /* if we can't dirty more, we must wait until some is written */
789 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
790 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
791 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
792 "osc max %ld, sys max %d\n", cli->cl_dirty,
793 cli->cl_dirty_max, obd_max_dirty_pages);
797 /* if still dirty cache but no grant wait for pending RPCs that
798 * may yet return us some grant before doing sync writes */
799 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
800 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
801 cli->cl_w_in_flight);
805 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
806 list_del_init(&ocw->ocw_entry);
807 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
808 /* no more RPCs in flight to return grant, do sync IO */
809 ocw->ocw_rc = -EDQUOT;
810 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
812 osc_consume_write_grant(cli,
813 &ocw->ocw_oap->oap_brw_page);
816 cfs_waitq_signal(&ocw->ocw_waitq);
822 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
824 client_obd_list_lock(&cli->cl_loi_list_lock);
825 cli->cl_avail_grant = ocd->ocd_grant;
826 client_obd_list_unlock(&cli->cl_loi_list_lock);
828 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
829 cli->cl_avail_grant, cli->cl_lost_grant);
830 LASSERT(cli->cl_avail_grant >= 0);
833 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
835 client_obd_list_lock(&cli->cl_loi_list_lock);
836 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
837 if (body->oa.o_valid & OBD_MD_FLGRANT)
838 cli->cl_avail_grant += body->oa.o_grant;
839 /* waiters are woken in brw_interpret_oap */
840 client_obd_list_unlock(&cli->cl_loi_list_lock);
843 /* We assume that the reason this OSC got a short read is because it read
844 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
845 * via the LOV, and it _knows_ it's reading inside the file, it's just that
846 * this stripe never got written at or beyond this stripe offset yet. */
847 static void handle_short_read(int nob_read, obd_count page_count,
848 struct brw_page **pga)
853 /* skip bytes read OK */
854 while (nob_read > 0) {
855 LASSERT (page_count > 0);
857 if (pga[i]->count > nob_read) {
858 /* EOF inside this page */
859 ptr = cfs_kmap(pga[i]->pg) +
860 (pga[i]->off & ~CFS_PAGE_MASK);
861 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
862 cfs_kunmap(pga[i]->pg);
868 nob_read -= pga[i]->count;
873 /* zero remaining pages */
874 while (page_count-- > 0) {
875 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
876 memset(ptr, 0, pga[i]->count);
877 cfs_kunmap(pga[i]->pg);
882 static int check_write_rcs(struct ptlrpc_request *req,
883 int requested_nob, int niocount,
884 obd_count page_count, struct brw_page **pga)
888 /* return error if any niobuf was in error */
889 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
890 sizeof(*remote_rcs) * niocount, NULL);
891 if (remote_rcs == NULL) {
892 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
895 if (lustre_msg_swabbed(req->rq_repmsg))
896 for (i = 0; i < niocount; i++)
897 __swab32s(&remote_rcs[i]);
899 for (i = 0; i < niocount; i++) {
900 if (remote_rcs[i] < 0)
901 return(remote_rcs[i]);
903 if (remote_rcs[i] != 0) {
904 CERROR("rc[%d] invalid (%d) req %p\n",
905 i, remote_rcs[i], req);
910 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
911 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
912 requested_nob, req->rq_bulk->bd_nob_transferred);
919 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
921 if (p1->flag != p2->flag) {
922 unsigned mask = ~OBD_BRW_FROM_GRANT;
924 /* warn if we try to combine flags that we don't know to be
926 if ((p1->flag & mask) != (p2->flag & mask))
927 CERROR("is it ok to have flags 0x%x and 0x%x in the "
928 "same brw?\n", p1->flag, p2->flag);
932 return (p1->off + p1->count == p2->off);
935 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
936 struct brw_page **pga, int opc)
941 LASSERT (pg_count > 0);
942 while (nob > 0 && pg_count > 0) {
943 unsigned char *ptr = cfs_kmap(pga[i]->pg);
944 int off = pga[i]->off & ~CFS_PAGE_MASK;
945 int count = pga[i]->count > nob ? nob : pga[i]->count;
947 /* corrupt the data before we compute the checksum, to
948 * simulate an OST->client data error */
949 if (i == 0 && opc == OST_READ &&
950 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
951 memcpy(ptr + off, "bad1", min(4, nob));
952 cksum = crc32_le(cksum, ptr + off, count);
953 cfs_kunmap(pga[i]->pg);
954 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
957 nob -= pga[i]->count;
961 /* For sending we only compute the wrong checksum instead
962 * of corrupting the data so it is still correct on a redo */
963 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
969 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
970 struct lov_stripe_md *lsm, obd_count page_count,
971 struct brw_page **pga,
972 struct ptlrpc_request **reqp,
973 struct obd_capa *ocapa)
975 struct ptlrpc_request *req;
976 struct ptlrpc_bulk_desc *desc;
977 struct ost_body *body;
978 struct obd_ioobj *ioobj;
979 struct niobuf_remote *niobuf;
980 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
981 int niocount, i, requested_nob, opc, rc;
982 struct ptlrpc_request_pool *pool;
983 struct lustre_capa *capa;
984 struct osc_brw_async_args *aa;
987 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
988 RETURN(-ENOMEM); /* Recoverable */
989 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
990 RETURN(-EINVAL); /* Fatal */
992 if ((cmd & OBD_BRW_WRITE) != 0) {
994 pool = cli->cl_import->imp_rq_pool;
1000 for (niocount = i = 1; i < page_count; i++) {
1001 if (!can_merge_pages(pga[i - 1], pga[i]))
1005 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1006 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1008 size[REQ_REC_OFF + 3] = sizeof(*capa);
1010 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
1011 size, NULL, pool, NULL);
1015 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1017 if (opc == OST_WRITE)
1018 desc = ptlrpc_prep_bulk_imp (req, page_count,
1019 BULK_GET_SOURCE, OST_BULK_PORTAL);
1021 desc = ptlrpc_prep_bulk_imp (req, page_count,
1022 BULK_PUT_SINK, OST_BULK_PORTAL);
1024 GOTO(out, rc = -ENOMEM);
1025 /* NB request now owns desc and will free it when it gets freed */
1027 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1028 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1029 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1030 niocount * sizeof(*niobuf));
1034 obdo_to_ioobj(oa, ioobj);
1035 ioobj->ioo_bufcnt = niocount;
1037 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1039 capa_cpy(capa, ocapa);
1040 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1043 LASSERT (page_count > 0);
1044 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1045 struct brw_page *pg = pga[i];
1046 struct brw_page *pg_prev = pga[i - 1];
1048 LASSERT(pg->count > 0);
1049 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1050 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1051 pg->off, pg->count);
1053 LASSERTF(i == 0 || pg->off > pg_prev->off,
1054 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1055 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1057 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1058 pg_prev->pg, page_private(pg_prev->pg),
1059 pg_prev->pg->index, pg_prev->off);
1061 LASSERTF(i == 0 || pg->off > pg_prev->off,
1062 "i %d p_c %u\n", i, page_count);
1064 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1065 (pg->flag & OBD_BRW_SRVLOCK));
1067 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1069 requested_nob += pg->count;
1071 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1073 niobuf->len += pg->count;
1075 niobuf->offset = pg->off;
1076 niobuf->len = pg->count;
1077 niobuf->flags = pg->flag;
1081 LASSERT((void *)(niobuf - niocount) ==
1082 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1083 niocount * sizeof(*niobuf)));
1084 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1086 /* size[REQ_REC_OFF] still sizeof (*body) */
1087 if (opc == OST_WRITE) {
1088 if (unlikely(cli->cl_checksum)) {
1089 body->oa.o_valid |= OBD_MD_FLCKSUM;
1090 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1093 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1095 /* save this in 'oa', too, for later checking */
1096 oa->o_valid |= OBD_MD_FLCKSUM;
1098 /* clear out the checksum flag, in case this is a
1099 * resend but cl_checksum is no longer set. b=11238 */
1100 oa->o_valid &= ~OBD_MD_FLCKSUM;
1102 oa->o_cksum = body->oa.o_cksum;
1103 /* 1 RC per niobuf */
1104 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1105 ptlrpc_req_set_repsize(req, 3, size);
1107 if (unlikely(cli->cl_checksum))
1108 body->oa.o_valid |= OBD_MD_FLCKSUM;
1109 /* 1 RC for the whole I/O */
1110 ptlrpc_req_set_repsize(req, 2, size);
1113 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1114 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1116 aa->aa_requested_nob = requested_nob;
1117 aa->aa_nio_count = niocount;
1118 aa->aa_page_count = page_count;
1122 INIT_LIST_HEAD(&aa->aa_oaps);
1128 ptlrpc_req_finished (req);
1132 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1133 __u32 client_cksum, __u32 server_cksum,
1134 int nob, obd_count page_count,
1135 struct brw_page **pga)
1140 if (server_cksum == client_cksum) {
1141 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1145 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1147 if (new_cksum == server_cksum)
1148 msg = "changed on the client after we checksummed it - "
1149 "likely false positive due to mmap IO (bug 11742)";
1150 else if (new_cksum == client_cksum)
1151 msg = "changed in transit before arrival at OST";
1153 msg = "changed in transit AND doesn't match the original - "
1154 "likely false positive due to mmap IO (bug 11742)";
1156 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1157 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1158 "["LPU64"-"LPU64"]\n",
1159 msg, libcfs_nid2str(peer->nid),
1160 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1161 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1164 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1166 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1167 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1168 client_cksum, server_cksum, new_cksum);
1172 /* Note rc enters this function as number of bytes transferred */
1173 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1175 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1176 const lnet_process_id_t *peer =
1177 &req->rq_import->imp_connection->c_peer;
1178 struct client_obd *cli = aa->aa_cli;
1179 struct ost_body *body;
1180 __u32 client_cksum = 0;
1183 if (rc < 0 && rc != -EDQUOT)
1186 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1187 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1188 lustre_swab_ost_body);
1190 CERROR ("Can't unpack body\n");
1194 /* set/clear over quota flag for a uid/gid */
1195 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1196 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1197 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1198 body->oa.o_gid, body->oa.o_valid,
1204 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1205 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1207 osc_update_grant(cli, body);
1209 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1211 CERROR ("Unexpected +ve rc %d\n", rc);
1214 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1216 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1218 check_write_checksum(&body->oa, peer, client_cksum,
1220 aa->aa_requested_nob,
1225 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1228 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1229 aa->aa_page_count, aa->aa_ppga);
1233 /* The rest of this function executes only for OST_READs */
1234 if (rc > aa->aa_requested_nob) {
1235 CERROR("Unexpected rc %d (%d requested)\n", rc,
1236 aa->aa_requested_nob);
1240 if (rc != req->rq_bulk->bd_nob_transferred) {
1241 CERROR ("Unexpected rc %d (%d transferred)\n",
1242 rc, req->rq_bulk->bd_nob_transferred);
1246 if (rc < aa->aa_requested_nob)
1247 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1249 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1251 GOTO(out, rc = -EAGAIN);
1253 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1254 static int cksum_counter;
1255 __u32 server_cksum = body->oa.o_cksum;
1259 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1260 aa->aa_ppga, OST_READ);
1262 if (peer->nid == req->rq_bulk->bd_sender) {
1266 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1269 if (server_cksum == ~0 && rc > 0) {
1270 CERROR("Protocol error: server %s set the 'checksum' "
1271 "bit, but didn't send a checksum. Not fatal, "
1272 "but please tell CFS.\n",
1273 libcfs_nid2str(peer->nid));
1274 } else if (server_cksum != client_cksum) {
1275 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1276 "%s%s%s inum "LPU64"/"LPU64" object "
1277 LPU64"/"LPU64" extent "
1278 "["LPU64"-"LPU64"]\n",
1279 req->rq_import->imp_obd->obd_name,
1280 libcfs_nid2str(peer->nid),
1282 body->oa.o_valid & OBD_MD_FLFID ?
1283 body->oa.o_fid : (__u64)0,
1284 body->oa.o_valid & OBD_MD_FLFID ?
1285 body->oa.o_generation :(__u64)0,
1287 body->oa.o_valid & OBD_MD_FLGROUP ?
1288 body->oa.o_gr : (__u64)0,
1289 aa->aa_ppga[0]->off,
1290 aa->aa_ppga[aa->aa_page_count-1]->off +
1291 aa->aa_ppga[aa->aa_page_count-1]->count -
1293 CERROR("client %x, server %x\n",
1294 client_cksum, server_cksum);
1296 aa->aa_oa->o_cksum = client_cksum;
1300 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1303 } else if (unlikely(client_cksum)) {
1304 static int cksum_missed;
1307 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1308 CERROR("Checksum %u requested from %s but not sent\n",
1309 cksum_missed, libcfs_nid2str(peer->nid));
1315 *aa->aa_oa = body->oa;
1320 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1321 struct lov_stripe_md *lsm,
1322 obd_count page_count, struct brw_page **pga,
1323 struct obd_capa *ocapa)
1325 struct ptlrpc_request *req;
1329 struct l_wait_info lwi;
1333 cfs_waitq_init(&waitq);
1336 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1337 page_count, pga, &req, ocapa);
1341 rc = ptlrpc_queue_wait(req);
1343 if (rc == -ETIMEDOUT && req->rq_resend) {
1344 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1345 ptlrpc_req_finished(req);
1349 rc = osc_brw_fini_request(req, rc);
1351 ptlrpc_req_finished(req);
1352 if (osc_recoverable_error(rc)) {
1354 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1355 CERROR("too many resend retries, returning error\n");
1359 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1360 l_wait_event(waitq, 0, &lwi);
1368 int osc_brw_redo_request(struct ptlrpc_request *request,
1369 struct osc_brw_async_args *aa)
1371 struct ptlrpc_request *new_req;
1372 struct ptlrpc_request_set *set = request->rq_set;
1373 struct osc_brw_async_args *new_aa;
1374 struct osc_async_page *oap;
1378 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1379 CERROR("too many resend retries, returning error\n");
1383 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1385 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1386 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1387 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1390 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1391 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1392 aa->aa_cli, aa->aa_oa,
1393 NULL /* lsm unused by osc currently */,
1394 aa->aa_page_count, aa->aa_ppga,
1395 &new_req, NULL /* ocapa */);
1399 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1401 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1402 if (oap->oap_request != NULL) {
1403 LASSERTF(request == oap->oap_request,
1404 "request %p != oap_request %p\n",
1405 request, oap->oap_request);
1406 if (oap->oap_interrupted) {
1407 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1408 ptlrpc_req_finished(new_req);
1413 /* New request takes over pga and oaps from old request.
1414 * Note that copying a list_head doesn't work, need to move it... */
1416 new_req->rq_interpret_reply = request->rq_interpret_reply;
1417 new_req->rq_async_args = request->rq_async_args;
1418 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1420 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1422 INIT_LIST_HEAD(&new_aa->aa_oaps);
1423 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1424 INIT_LIST_HEAD(&aa->aa_oaps);
1426 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1427 if (oap->oap_request) {
1428 ptlrpc_req_finished(oap->oap_request);
1429 oap->oap_request = ptlrpc_request_addref(new_req);
1433 /* use ptlrpc_set_add_req is safe because interpret functions work
1434 * in check_set context. only one way exist with access to request
1435 * from different thread got -EINTR - this way protected with
1436 * cl_loi_list_lock */
1437 ptlrpc_set_add_req(set, new_req);
1439 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1441 DEBUG_REQ(D_INFO, new_req, "new request");
1445 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1447 struct osc_brw_async_args *aa = data;
1451 rc = osc_brw_fini_request(req, rc);
1452 if (osc_recoverable_error(rc)) {
1453 rc = osc_brw_redo_request(req, aa);
1458 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1459 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1460 aa->aa_cli->cl_w_in_flight--;
1462 aa->aa_cli->cl_r_in_flight--;
1463 for (i = 0; i < aa->aa_page_count; i++)
1464 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1465 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1467 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1472 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1473 struct lov_stripe_md *lsm, obd_count page_count,
1474 struct brw_page **pga, struct ptlrpc_request_set *set,
1475 struct obd_capa *ocapa)
1477 struct ptlrpc_request *req;
1478 struct client_obd *cli = &exp->exp_obd->u.cli;
1480 struct osc_brw_async_args *aa;
1483 /* Consume write credits even if doing a sync write -
1484 * otherwise we may run out of space on OST due to grant. */
1485 if (cmd == OBD_BRW_WRITE) {
1486 spin_lock(&cli->cl_loi_list_lock);
1487 for (i = 0; i < page_count; i++) {
1488 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1489 osc_consume_write_grant(cli, pga[i]);
1491 spin_unlock(&cli->cl_loi_list_lock);
1494 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1497 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1498 if (cmd == OBD_BRW_READ) {
1499 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1500 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1501 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1503 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1504 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1505 cli->cl_w_in_flight);
1506 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1510 req->rq_interpret_reply = brw_interpret;
1511 ptlrpc_set_add_req(set, req);
1512 client_obd_list_lock(&cli->cl_loi_list_lock);
1513 if (cmd == OBD_BRW_READ)
1514 cli->cl_r_in_flight++;
1516 cli->cl_w_in_flight++;
1517 client_obd_list_unlock(&cli->cl_loi_list_lock);
1518 } else if (cmd == OBD_BRW_WRITE) {
1519 client_obd_list_lock(&cli->cl_loi_list_lock);
1520 for (i = 0; i < page_count; i++)
1521 osc_release_write_grant(cli, pga[i], 0);
1522 client_obd_list_unlock(&cli->cl_loi_list_lock);
1528 * ugh, we want disk allocation on the target to happen in offset order. we'll
1529 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1530 * fine for our small page arrays and doesn't require allocation. its an
1531 * insertion sort that swaps elements that are strides apart, shrinking the
1532 * stride down until its '1' and the array is sorted.
1534 static void sort_brw_pages(struct brw_page **array, int num)
1537 struct brw_page *tmp;
1541 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1546 for (i = stride ; i < num ; i++) {
1549 while (j >= stride && array[j - stride]->off > tmp->off) {
1550 array[j] = array[j - stride];
1555 } while (stride > 1);
1558 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1564 LASSERT (pages > 0);
1565 offset = pg[i]->off & ~CFS_PAGE_MASK;
1569 if (pages == 0) /* that's all */
1572 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1573 return count; /* doesn't end on page boundary */
1576 offset = pg[i]->off & ~CFS_PAGE_MASK;
1577 if (offset != 0) /* doesn't start on page boundary */
1584 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1586 struct brw_page **ppga;
1589 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1593 for (i = 0; i < count; i++)
1598 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1600 LASSERT(ppga != NULL);
1601 OBD_FREE(ppga, sizeof(*ppga) * count);
1604 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1605 obd_count page_count, struct brw_page *pga,
1606 struct obd_trans_info *oti)
1608 struct obdo *saved_oa = NULL;
1609 struct brw_page **ppga, **orig;
1610 struct obd_import *imp = class_exp2cliimp(exp);
1611 struct client_obd *cli = &imp->imp_obd->u.cli;
1612 int rc, page_count_orig;
1615 if (cmd & OBD_BRW_CHECK) {
1616 /* The caller just wants to know if there's a chance that this
1617 * I/O can succeed */
1619 if (imp == NULL || imp->imp_invalid)
1624 /* test_brw with a failed create can trip this, maybe others. */
1625 LASSERT(cli->cl_max_pages_per_rpc);
1629 orig = ppga = osc_build_ppga(pga, page_count);
1632 page_count_orig = page_count;
1634 sort_brw_pages(ppga, page_count);
1635 while (page_count) {
1636 obd_count pages_per_brw;
1638 if (page_count > cli->cl_max_pages_per_rpc)
1639 pages_per_brw = cli->cl_max_pages_per_rpc;
1641 pages_per_brw = page_count;
1643 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1645 if (saved_oa != NULL) {
1646 /* restore previously saved oa */
1647 *oinfo->oi_oa = *saved_oa;
1648 } else if (page_count > pages_per_brw) {
1649 /* save a copy of oa (brw will clobber it) */
1650 OBDO_ALLOC(saved_oa);
1651 if (saved_oa == NULL)
1652 GOTO(out, rc = -ENOMEM);
1653 *saved_oa = *oinfo->oi_oa;
1656 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1657 pages_per_brw, ppga, oinfo->oi_capa);
1662 page_count -= pages_per_brw;
1663 ppga += pages_per_brw;
1667 osc_release_ppga(orig, page_count_orig);
1669 if (saved_oa != NULL)
1670 OBDO_FREE(saved_oa);
1675 static int osc_brw_async(int cmd, struct obd_export *exp,
1676 struct obd_info *oinfo, obd_count page_count,
1677 struct brw_page *pga, struct obd_trans_info *oti,
1678 struct ptlrpc_request_set *set)
1680 struct brw_page **ppga, **orig;
1681 struct client_obd *cli = &exp->exp_obd->u.cli;
1682 int page_count_orig;
1686 if (cmd & OBD_BRW_CHECK) {
1687 struct obd_import *imp = class_exp2cliimp(exp);
1688 /* The caller just wants to know if there's a chance that this
1689 * I/O can succeed */
1691 if (imp == NULL || imp->imp_invalid)
1696 orig = ppga = osc_build_ppga(pga, page_count);
1699 page_count_orig = page_count;
1701 sort_brw_pages(ppga, page_count);
1702 while (page_count) {
1703 struct brw_page **copy;
1704 obd_count pages_per_brw;
1706 pages_per_brw = min_t(obd_count, page_count,
1707 cli->cl_max_pages_per_rpc);
1709 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1711 /* use ppga only if single RPC is going to fly */
1712 if (pages_per_brw != page_count_orig || ppga != orig) {
1713 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1715 GOTO(out, rc = -ENOMEM);
1716 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1720 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1721 pages_per_brw, copy, set, oinfo->oi_capa);
1725 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1729 /* we passed it to async_internal() which is
1730 * now responsible for releasing memory */
1734 page_count -= pages_per_brw;
1735 ppga += pages_per_brw;
1739 osc_release_ppga(orig, page_count_orig);
1743 static void osc_check_rpcs(struct client_obd *cli);
1745 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1746 * the dirty accounting. Writeback completes or truncate happens before
1747 * writing starts. Must be called with the loi lock held. */
1748 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1751 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1755 /* This maintains the lists of pending pages to read/write for a given object
1756 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1757 * to quickly find objects that are ready to send an RPC. */
1758 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1764 if (lop->lop_num_pending == 0)
1767 /* if we have an invalid import we want to drain the queued pages
1768 * by forcing them through rpcs that immediately fail and complete
1769 * the pages. recovery relies on this to empty the queued pages
1770 * before canceling the locks and evicting down the llite pages */
1771 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1774 /* stream rpcs in queue order as long as as there is an urgent page
1775 * queued. this is our cheap solution for good batching in the case
1776 * where writepage marks some random page in the middle of the file
1777 * as urgent because of, say, memory pressure */
1778 if (!list_empty(&lop->lop_urgent)) {
1779 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1782 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1783 optimal = cli->cl_max_pages_per_rpc;
1784 if (cmd & OBD_BRW_WRITE) {
1785 /* trigger a write rpc stream as long as there are dirtiers
1786 * waiting for space. as they're waiting, they're not going to
1787 * create more pages to coallesce with what's waiting.. */
1788 if (!list_empty(&cli->cl_cache_waiters)) {
1789 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1792 /* +16 to avoid triggering rpcs that would want to include pages
1793 * that are being queued but which can't be made ready until
1794 * the queuer finishes with the page. this is a wart for
1795 * llite::commit_write() */
1798 if (lop->lop_num_pending >= optimal)
1804 static void on_list(struct list_head *item, struct list_head *list,
1807 if (list_empty(item) && should_be_on)
1808 list_add_tail(item, list);
1809 else if (!list_empty(item) && !should_be_on)
1810 list_del_init(item);
1813 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1814 * can find pages to build into rpcs quickly */
1815 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1817 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1818 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1819 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1821 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1822 loi->loi_write_lop.lop_num_pending);
1824 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1825 loi->loi_read_lop.lop_num_pending);
1828 static void lop_update_pending(struct client_obd *cli,
1829 struct loi_oap_pages *lop, int cmd, int delta)
1831 lop->lop_num_pending += delta;
1832 if (cmd & OBD_BRW_WRITE)
1833 cli->cl_pending_w_pages += delta;
1835 cli->cl_pending_r_pages += delta;
1838 /* this is called when a sync waiter receives an interruption. Its job is to
1839 * get the caller woken as soon as possible. If its page hasn't been put in an
1840 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1841 * desiring interruption which will forcefully complete the rpc once the rpc
1843 static void osc_occ_interrupted(struct oig_callback_context *occ)
1845 struct osc_async_page *oap;
1846 struct loi_oap_pages *lop;
1847 struct lov_oinfo *loi;
1850 /* XXX member_of() */
1851 oap = list_entry(occ, struct osc_async_page, oap_occ);
1853 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1855 oap->oap_interrupted = 1;
1857 /* ok, it's been put in an rpc. only one oap gets a request reference */
1858 if (oap->oap_request != NULL) {
1859 ptlrpc_mark_interrupted(oap->oap_request);
1860 ptlrpcd_wake(oap->oap_request);
1864 /* we don't get interruption callbacks until osc_trigger_group_io()
1865 * has been called and put the sync oaps in the pending/urgent lists.*/
1866 if (!list_empty(&oap->oap_pending_item)) {
1867 list_del_init(&oap->oap_pending_item);
1868 list_del_init(&oap->oap_urgent_item);
1871 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1872 &loi->loi_write_lop : &loi->loi_read_lop;
1873 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1874 loi_list_maint(oap->oap_cli, oap->oap_loi);
1876 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1877 oap->oap_oig = NULL;
1881 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1884 /* this is trying to propogate async writeback errors back up to the
1885 * application. As an async write fails we record the error code for later if
1886 * the app does an fsync. As long as errors persist we force future rpcs to be
1887 * sync so that the app can get a sync error and break the cycle of queueing
1888 * pages for which writeback will fail. */
1889 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1896 ar->ar_force_sync = 1;
1897 ar->ar_min_xid = ptlrpc_sample_next_xid();
1902 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1903 ar->ar_force_sync = 0;
1906 static void osc_oap_to_pending(struct osc_async_page *oap)
1908 struct loi_oap_pages *lop;
1910 if (oap->oap_cmd & OBD_BRW_WRITE)
1911 lop = &oap->oap_loi->loi_write_lop;
1913 lop = &oap->oap_loi->loi_read_lop;
1915 if (oap->oap_async_flags & ASYNC_URGENT)
1916 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1917 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1918 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1921 /* this must be called holding the loi list lock to give coverage to exit_cache,
1922 * async_flag maintenance, and oap_request */
1923 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1924 struct osc_async_page *oap, int sent, int rc)
1929 if (oap->oap_request != NULL) {
1930 xid = ptlrpc_req_xid(oap->oap_request);
1931 ptlrpc_req_finished(oap->oap_request);
1932 oap->oap_request = NULL;
1935 oap->oap_async_flags = 0;
1936 oap->oap_interrupted = 0;
1938 if (oap->oap_cmd & OBD_BRW_WRITE) {
1939 osc_process_ar(&cli->cl_ar, xid, rc);
1940 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1943 if (rc == 0 && oa != NULL) {
1944 if (oa->o_valid & OBD_MD_FLBLOCKS)
1945 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1946 if (oa->o_valid & OBD_MD_FLMTIME)
1947 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1948 if (oa->o_valid & OBD_MD_FLATIME)
1949 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1950 if (oa->o_valid & OBD_MD_FLCTIME)
1951 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1955 osc_exit_cache(cli, oap, sent);
1956 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1957 oap->oap_oig = NULL;
1962 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1963 oap->oap_cmd, oa, rc);
1965 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1966 * I/O on the page could start, but OSC calls it under lock
1967 * and thus we can add oap back to pending safely */
1969 /* upper layer wants to leave the page on pending queue */
1970 osc_oap_to_pending(oap);
1972 osc_exit_cache(cli, oap, sent);
1976 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1978 struct osc_async_page *oap, *tmp;
1979 struct osc_brw_async_args *aa = data;
1980 struct client_obd *cli;
1983 rc = osc_brw_fini_request(req, rc);
1984 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1985 if (osc_recoverable_error(rc)) {
1986 rc = osc_brw_redo_request(req, aa);
1993 client_obd_list_lock(&cli->cl_loi_list_lock);
1995 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1996 * is called so we know whether to go to sync BRWs or wait for more
1997 * RPCs to complete */
1998 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1999 cli->cl_w_in_flight--;
2001 cli->cl_r_in_flight--;
2003 /* the caller may re-use the oap after the completion call so
2004 * we need to clean it up a little */
2005 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2006 list_del_init(&oap->oap_rpc_item);
2007 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2010 osc_wake_cache_waiters(cli);
2011 osc_check_rpcs(cli);
2013 client_obd_list_unlock(&cli->cl_loi_list_lock);
2015 OBDO_FREE(aa->aa_oa);
2017 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2021 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2022 struct list_head *rpc_list,
2023 int page_count, int cmd)
2025 struct ptlrpc_request *req;
2026 struct brw_page **pga = NULL;
2027 struct osc_brw_async_args *aa;
2028 struct obdo *oa = NULL;
2029 struct obd_async_page_ops *ops = NULL;
2030 void *caller_data = NULL;
2031 struct obd_capa *ocapa;
2032 struct osc_async_page *oap;
2036 LASSERT(!list_empty(rpc_list));
2038 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2040 RETURN(ERR_PTR(-ENOMEM));
2044 GOTO(out, req = ERR_PTR(-ENOMEM));
2047 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2049 ops = oap->oap_caller_ops;
2050 caller_data = oap->oap_caller_data;
2052 pga[i] = &oap->oap_brw_page;
2053 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2054 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2055 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2059 /* always get the data for the obdo for the rpc */
2060 LASSERT(ops != NULL);
2061 ops->ap_fill_obdo(caller_data, cmd, oa);
2062 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2064 sort_brw_pages(pga, page_count);
2065 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2069 CERROR("prep_req failed: %d\n", rc);
2070 GOTO(out, req = ERR_PTR(rc));
2073 /* Need to update the timestamps after the request is built in case
2074 * we race with setattr (locally or in queue at OST). If OST gets
2075 * later setattr before earlier BRW (as determined by the request xid),
2076 * the OST will not use BRW timestamps. Sadly, there is no obvious
2077 * way to do this in a single call. bug 10150 */
2078 ops->ap_update_obdo(caller_data, cmd, oa,
2079 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2081 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2082 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2083 INIT_LIST_HEAD(&aa->aa_oaps);
2084 list_splice(rpc_list, &aa->aa_oaps);
2085 INIT_LIST_HEAD(rpc_list);
2092 OBD_FREE(pga, sizeof(*pga) * page_count);
2097 /* the loi lock is held across this function but it's allowed to release
2098 * and reacquire it during its work */
2099 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2100 int cmd, struct loi_oap_pages *lop)
2102 struct ptlrpc_request *req;
2103 obd_count page_count = 0;
2104 struct osc_async_page *oap = NULL, *tmp;
2105 struct osc_brw_async_args *aa;
2106 struct obd_async_page_ops *ops;
2107 CFS_LIST_HEAD(rpc_list);
2108 unsigned int ending_offset;
2109 unsigned starting_offset = 0;
2112 /* first we find the pages we're allowed to work with */
2113 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2115 ops = oap->oap_caller_ops;
2117 LASSERT(oap->oap_magic == OAP_MAGIC);
2119 /* in llite being 'ready' equates to the page being locked
2120 * until completion unlocks it. commit_write submits a page
2121 * as not ready because its unlock will happen unconditionally
2122 * as the call returns. if we race with commit_write giving
2123 * us that page we dont' want to create a hole in the page
2124 * stream, so we stop and leave the rpc to be fired by
2125 * another dirtier or kupdated interval (the not ready page
2126 * will still be on the dirty list). we could call in
2127 * at the end of ll_file_write to process the queue again. */
2128 if (!(oap->oap_async_flags & ASYNC_READY)) {
2129 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2131 CDEBUG(D_INODE, "oap %p page %p returned %d "
2132 "instead of ready\n", oap,
2136 /* llite is telling us that the page is still
2137 * in commit_write and that we should try
2138 * and put it in an rpc again later. we
2139 * break out of the loop so we don't create
2140 * a hole in the sequence of pages in the rpc
2145 /* the io isn't needed.. tell the checks
2146 * below to complete the rpc with EINTR */
2147 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2148 oap->oap_count = -EINTR;
2151 oap->oap_async_flags |= ASYNC_READY;
2154 LASSERTF(0, "oap %p page %p returned %d "
2155 "from make_ready\n", oap,
2163 * Page submitted for IO has to be locked. Either by
2164 * ->ap_make_ready() or by higher layers.
2166 * XXX nikita: this assertion should be adjusted when lustre
2167 * starts using PG_writeback for pages being written out.
2169 #if defined(__KERNEL__) && defined(__linux__)
2170 LASSERT(PageLocked(oap->oap_page));
2172 /* If there is a gap at the start of this page, it can't merge
2173 * with any previous page, so we'll hand the network a
2174 * "fragmented" page array that it can't transfer in 1 RDMA */
2175 if (page_count != 0 && oap->oap_page_off != 0)
2178 /* take the page out of our book-keeping */
2179 list_del_init(&oap->oap_pending_item);
2180 lop_update_pending(cli, lop, cmd, -1);
2181 list_del_init(&oap->oap_urgent_item);
2183 if (page_count == 0)
2184 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2185 (PTLRPC_MAX_BRW_SIZE - 1);
2187 /* ask the caller for the size of the io as the rpc leaves. */
2188 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2190 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2191 if (oap->oap_count <= 0) {
2192 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2194 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2198 /* now put the page back in our accounting */
2199 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2200 if (++page_count >= cli->cl_max_pages_per_rpc)
2203 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2204 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2205 * have the same alignment as the initial writes that allocated
2206 * extents on the server. */
2207 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2208 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2209 if (ending_offset == 0)
2212 /* If there is a gap at the end of this page, it can't merge
2213 * with any subsequent pages, so we'll hand the network a
2214 * "fragmented" page array that it can't transfer in 1 RDMA */
2215 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2219 osc_wake_cache_waiters(cli);
2221 if (page_count == 0)
2224 loi_list_maint(cli, loi);
2226 client_obd_list_unlock(&cli->cl_loi_list_lock);
2228 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2230 /* this should happen rarely and is pretty bad, it makes the
2231 * pending list not follow the dirty order */
2232 client_obd_list_lock(&cli->cl_loi_list_lock);
2233 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2234 list_del_init(&oap->oap_rpc_item);
2236 /* queued sync pages can be torn down while the pages
2237 * were between the pending list and the rpc */
2238 if (oap->oap_interrupted) {
2239 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2240 osc_ap_completion(cli, NULL, oap, 0,
2244 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2246 loi_list_maint(cli, loi);
2247 RETURN(PTR_ERR(req));
2250 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2252 if (cmd == OBD_BRW_READ) {
2253 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2254 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2255 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2256 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2257 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2259 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2260 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2261 cli->cl_w_in_flight);
2262 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2263 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2264 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2267 client_obd_list_lock(&cli->cl_loi_list_lock);
2269 if (cmd == OBD_BRW_READ)
2270 cli->cl_r_in_flight++;
2272 cli->cl_w_in_flight++;
2274 /* queued sync pages can be torn down while the pages
2275 * were between the pending list and the rpc */
2277 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2278 /* only one oap gets a request reference */
2281 if (oap->oap_interrupted && !req->rq_intr) {
2282 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2284 ptlrpc_mark_interrupted(req);
2288 tmp->oap_request = ptlrpc_request_addref(req);
2290 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2291 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2293 req->rq_interpret_reply = brw_interpret_oap;
2294 ptlrpcd_add_req(req);
2298 #define LOI_DEBUG(LOI, STR, args...) \
2299 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2300 !list_empty(&(LOI)->loi_cli_item), \
2301 (LOI)->loi_write_lop.lop_num_pending, \
2302 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2303 (LOI)->loi_read_lop.lop_num_pending, \
2304 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2307 /* This is called by osc_check_rpcs() to find which objects have pages that
2308 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2309 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2312 /* first return all objects which we already know to have
2313 * pages ready to be stuffed into rpcs */
2314 if (!list_empty(&cli->cl_loi_ready_list))
2315 RETURN(list_entry(cli->cl_loi_ready_list.next,
2316 struct lov_oinfo, loi_cli_item));
2318 /* then if we have cache waiters, return all objects with queued
2319 * writes. This is especially important when many small files
2320 * have filled up the cache and not been fired into rpcs because
2321 * they don't pass the nr_pending/object threshhold */
2322 if (!list_empty(&cli->cl_cache_waiters) &&
2323 !list_empty(&cli->cl_loi_write_list))
2324 RETURN(list_entry(cli->cl_loi_write_list.next,
2325 struct lov_oinfo, loi_write_item));
2327 /* then return all queued objects when we have an invalid import
2328 * so that they get flushed */
2329 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2330 if (!list_empty(&cli->cl_loi_write_list))
2331 RETURN(list_entry(cli->cl_loi_write_list.next,
2332 struct lov_oinfo, loi_write_item));
2333 if (!list_empty(&cli->cl_loi_read_list))
2334 RETURN(list_entry(cli->cl_loi_read_list.next,
2335 struct lov_oinfo, loi_read_item));
2340 /* called with the loi list lock held */
2341 static void osc_check_rpcs(struct client_obd *cli)
2343 struct lov_oinfo *loi;
2344 int rc = 0, race_counter = 0;
2347 while ((loi = osc_next_loi(cli)) != NULL) {
2348 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2350 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2353 /* attempt some read/write balancing by alternating between
2354 * reads and writes in an object. The makes_rpc checks here
2355 * would be redundant if we were getting read/write work items
2356 * instead of objects. we don't want send_oap_rpc to drain a
2357 * partial read pending queue when we're given this object to
2358 * do io on writes while there are cache waiters */
2359 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2360 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2361 &loi->loi_write_lop);
2369 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2370 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2371 &loi->loi_read_lop);
2380 /* attempt some inter-object balancing by issueing rpcs
2381 * for each object in turn */
2382 if (!list_empty(&loi->loi_cli_item))
2383 list_del_init(&loi->loi_cli_item);
2384 if (!list_empty(&loi->loi_write_item))
2385 list_del_init(&loi->loi_write_item);
2386 if (!list_empty(&loi->loi_read_item))
2387 list_del_init(&loi->loi_read_item);
2389 loi_list_maint(cli, loi);
2391 /* send_oap_rpc fails with 0 when make_ready tells it to
2392 * back off. llite's make_ready does this when it tries
2393 * to lock a page queued for write that is already locked.
2394 * we want to try sending rpcs from many objects, but we
2395 * don't want to spin failing with 0. */
2396 if (race_counter == 10)
2402 /* we're trying to queue a page in the osc so we're subject to the
2403 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2404 * If the osc's queued pages are already at that limit, then we want to sleep
2405 * until there is space in the osc's queue for us. We also may be waiting for
2406 * write credits from the OST if there are RPCs in flight that may return some
2407 * before we fall back to sync writes.
2409 * We need this know our allocation was granted in the presence of signals */
2410 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2414 client_obd_list_lock(&cli->cl_loi_list_lock);
2415 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2416 client_obd_list_unlock(&cli->cl_loi_list_lock);
2420 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2421 * grant or cache space. */
2422 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2423 struct osc_async_page *oap)
2425 struct osc_cache_waiter ocw;
2426 struct l_wait_info lwi = { 0 };
2430 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2431 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2432 cli->cl_dirty_max, obd_max_dirty_pages,
2433 cli->cl_lost_grant, cli->cl_avail_grant);
2435 /* force the caller to try sync io. this can jump the list
2436 * of queued writes and create a discontiguous rpc stream */
2437 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2438 loi->loi_ar.ar_force_sync)
2441 /* Hopefully normal case - cache space and write credits available */
2442 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2443 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2444 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2445 /* account for ourselves */
2446 osc_consume_write_grant(cli, &oap->oap_brw_page);
2450 /* Make sure that there are write rpcs in flight to wait for. This
2451 * is a little silly as this object may not have any pending but
2452 * other objects sure might. */
2453 if (cli->cl_w_in_flight) {
2454 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2455 cfs_waitq_init(&ocw.ocw_waitq);
2459 loi_list_maint(cli, loi);
2460 osc_check_rpcs(cli);
2461 client_obd_list_unlock(&cli->cl_loi_list_lock);
2463 CDEBUG(D_CACHE, "sleeping for cache space\n");
2464 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2466 client_obd_list_lock(&cli->cl_loi_list_lock);
2467 if (!list_empty(&ocw.ocw_entry)) {
2468 list_del(&ocw.ocw_entry);
2477 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2478 struct lov_oinfo *loi, cfs_page_t *page,
2479 obd_off offset, struct obd_async_page_ops *ops,
2480 void *data, void **res)
2482 struct osc_async_page *oap;
2486 return size_round(sizeof(*oap));
2489 oap->oap_magic = OAP_MAGIC;
2490 oap->oap_cli = &exp->exp_obd->u.cli;
2493 oap->oap_caller_ops = ops;
2494 oap->oap_caller_data = data;
2496 oap->oap_page = page;
2497 oap->oap_obj_off = offset;
2499 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2500 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2501 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2503 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2505 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2509 struct osc_async_page *oap_from_cookie(void *cookie)
2511 struct osc_async_page *oap = cookie;
2512 if (oap->oap_magic != OAP_MAGIC)
2513 return ERR_PTR(-EINVAL);
2517 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2518 struct lov_oinfo *loi, void *cookie,
2519 int cmd, obd_off off, int count,
2520 obd_flag brw_flags, enum async_flags async_flags)
2522 struct client_obd *cli = &exp->exp_obd->u.cli;
2523 struct osc_async_page *oap;
2527 oap = oap_from_cookie(cookie);
2529 RETURN(PTR_ERR(oap));
2531 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2534 if (!list_empty(&oap->oap_pending_item) ||
2535 !list_empty(&oap->oap_urgent_item) ||
2536 !list_empty(&oap->oap_rpc_item))
2539 /* check if the file's owner/group is over quota */
2540 #ifdef HAVE_QUOTA_SUPPORT
2541 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2542 struct obd_async_page_ops *ops;
2549 ops = oap->oap_caller_ops;
2550 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2551 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2562 loi = lsm->lsm_oinfo[0];
2564 client_obd_list_lock(&cli->cl_loi_list_lock);
2567 oap->oap_page_off = off;
2568 oap->oap_count = count;
2569 oap->oap_brw_flags = brw_flags;
2570 oap->oap_async_flags = async_flags;
2572 if (cmd & OBD_BRW_WRITE) {
2573 rc = osc_enter_cache(cli, loi, oap);
2575 client_obd_list_unlock(&cli->cl_loi_list_lock);
2580 osc_oap_to_pending(oap);
2581 loi_list_maint(cli, loi);
2583 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2586 osc_check_rpcs(cli);
2587 client_obd_list_unlock(&cli->cl_loi_list_lock);
2592 /* aka (~was & now & flag), but this is more clear :) */
2593 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2595 static int osc_set_async_flags(struct obd_export *exp,
2596 struct lov_stripe_md *lsm,
2597 struct lov_oinfo *loi, void *cookie,
2598 obd_flag async_flags)
2600 struct client_obd *cli = &exp->exp_obd->u.cli;
2601 struct loi_oap_pages *lop;
2602 struct osc_async_page *oap;
2606 oap = oap_from_cookie(cookie);
2608 RETURN(PTR_ERR(oap));
2611 * bug 7311: OST-side locking is only supported for liblustre for now
2612 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2613 * implementation has to handle case where OST-locked page was picked
2614 * up by, e.g., ->writepage().
2616 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2617 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2620 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2624 loi = lsm->lsm_oinfo[0];
2626 if (oap->oap_cmd & OBD_BRW_WRITE) {
2627 lop = &loi->loi_write_lop;
2629 lop = &loi->loi_read_lop;
2632 client_obd_list_lock(&cli->cl_loi_list_lock);
2634 if (list_empty(&oap->oap_pending_item))
2635 GOTO(out, rc = -EINVAL);
2637 if ((oap->oap_async_flags & async_flags) == async_flags)
2640 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2641 oap->oap_async_flags |= ASYNC_READY;
2643 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2644 if (list_empty(&oap->oap_rpc_item)) {
2645 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2646 loi_list_maint(cli, loi);
2650 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2651 oap->oap_async_flags);
2653 osc_check_rpcs(cli);
2654 client_obd_list_unlock(&cli->cl_loi_list_lock);
2658 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2659 struct lov_oinfo *loi,
2660 struct obd_io_group *oig, void *cookie,
2661 int cmd, obd_off off, int count,
2663 obd_flag async_flags)
2665 struct client_obd *cli = &exp->exp_obd->u.cli;
2666 struct osc_async_page *oap;
2667 struct loi_oap_pages *lop;
2671 oap = oap_from_cookie(cookie);
2673 RETURN(PTR_ERR(oap));
2675 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2678 if (!list_empty(&oap->oap_pending_item) ||
2679 !list_empty(&oap->oap_urgent_item) ||
2680 !list_empty(&oap->oap_rpc_item))
2684 loi = lsm->lsm_oinfo[0];
2686 client_obd_list_lock(&cli->cl_loi_list_lock);
2689 oap->oap_page_off = off;
2690 oap->oap_count = count;
2691 oap->oap_brw_flags = brw_flags;
2692 oap->oap_async_flags = async_flags;
2694 if (cmd & OBD_BRW_WRITE)
2695 lop = &loi->loi_write_lop;
2697 lop = &loi->loi_read_lop;
2699 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2700 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2702 rc = oig_add_one(oig, &oap->oap_occ);
2705 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2706 oap, oap->oap_page, rc);
2708 client_obd_list_unlock(&cli->cl_loi_list_lock);
2713 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2714 struct loi_oap_pages *lop, int cmd)
2716 struct list_head *pos, *tmp;
2717 struct osc_async_page *oap;
2719 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2720 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2721 list_del(&oap->oap_pending_item);
2722 osc_oap_to_pending(oap);
2724 loi_list_maint(cli, loi);
2727 static int osc_trigger_group_io(struct obd_export *exp,
2728 struct lov_stripe_md *lsm,
2729 struct lov_oinfo *loi,
2730 struct obd_io_group *oig)
2732 struct client_obd *cli = &exp->exp_obd->u.cli;
2736 loi = lsm->lsm_oinfo[0];
2738 client_obd_list_lock(&cli->cl_loi_list_lock);
2740 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2741 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2743 osc_check_rpcs(cli);
2744 client_obd_list_unlock(&cli->cl_loi_list_lock);
2749 static int osc_teardown_async_page(struct obd_export *exp,
2750 struct lov_stripe_md *lsm,
2751 struct lov_oinfo *loi, void *cookie)
2753 struct client_obd *cli = &exp->exp_obd->u.cli;
2754 struct loi_oap_pages *lop;
2755 struct osc_async_page *oap;
2759 oap = oap_from_cookie(cookie);
2761 RETURN(PTR_ERR(oap));
2764 loi = lsm->lsm_oinfo[0];
2766 if (oap->oap_cmd & OBD_BRW_WRITE) {
2767 lop = &loi->loi_write_lop;
2769 lop = &loi->loi_read_lop;
2772 client_obd_list_lock(&cli->cl_loi_list_lock);
2774 if (!list_empty(&oap->oap_rpc_item))
2775 GOTO(out, rc = -EBUSY);
2777 osc_exit_cache(cli, oap, 0);
2778 osc_wake_cache_waiters(cli);
2780 if (!list_empty(&oap->oap_urgent_item)) {
2781 list_del_init(&oap->oap_urgent_item);
2782 oap->oap_async_flags &= ~ASYNC_URGENT;
2784 if (!list_empty(&oap->oap_pending_item)) {
2785 list_del_init(&oap->oap_pending_item);
2786 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2788 loi_list_maint(cli, loi);
2790 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2792 client_obd_list_unlock(&cli->cl_loi_list_lock);
2796 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2799 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2802 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2805 lock_res_and_lock(lock);
2806 #if defined (__KERNEL__) && defined (__linux__)
2807 /* Liang XXX: Darwin and Winnt checking should be added */
2808 if (lock->l_ast_data && lock->l_ast_data != data) {
2809 struct inode *new_inode = data;
2810 struct inode *old_inode = lock->l_ast_data;
2811 if (!(old_inode->i_state & I_FREEING))
2812 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2813 LASSERTF(old_inode->i_state & I_FREEING,
2814 "Found existing inode %p/%lu/%u state %lu in lock: "
2815 "setting data to %p/%lu/%u\n", old_inode,
2816 old_inode->i_ino, old_inode->i_generation,
2818 new_inode, new_inode->i_ino, new_inode->i_generation);
2821 lock->l_ast_data = data;
2822 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2823 unlock_res_and_lock(lock);
2824 LDLM_LOCK_PUT(lock);
2827 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2828 ldlm_iterator_t replace, void *data)
2830 struct ldlm_res_id res_id = { .name = {0} };
2831 struct obd_device *obd = class_exp2obd(exp);
2833 res_id.name[0] = lsm->lsm_object_id;
2834 res_id.name[2] = lsm->lsm_object_gr;
2836 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2840 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2846 /* The request was created before ldlm_cli_enqueue call. */
2847 if (rc == ELDLM_LOCK_ABORTED) {
2848 struct ldlm_reply *rep;
2850 /* swabbed by ldlm_cli_enqueue() */
2851 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2852 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2854 LASSERT(rep != NULL);
2855 if (rep->lock_policy_res1)
2856 rc = rep->lock_policy_res1;
2860 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2861 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2862 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2863 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2864 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2867 /* Call the update callback. */
2868 rc = oinfo->oi_cb_up(oinfo, rc);
2872 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2873 struct osc_enqueue_args *aa, int rc)
2875 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2876 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2877 struct ldlm_lock *lock;
2879 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2881 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2883 /* Complete obtaining the lock procedure. */
2884 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2886 &aa->oa_oi->oi_flags,
2887 &lsm->lsm_oinfo[0]->loi_lvb,
2888 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2889 lustre_swab_ost_lvb,
2890 aa->oa_oi->oi_lockh, rc);
2892 /* Complete osc stuff. */
2893 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2895 /* Release the lock for async request. */
2896 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2897 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2899 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2900 aa->oa_oi->oi_lockh, req, aa);
2901 LDLM_LOCK_PUT(lock);
2905 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2906 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2907 * other synchronous requests, however keeping some locks and trying to obtain
2908 * others may take a considerable amount of time in a case of ost failure; and
2909 * when other sync requests do not get released lock from a client, the client
2910 * is excluded from the cluster -- such scenarious make the life difficult, so
2911 * release locks just after they are obtained. */
2912 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2913 struct ldlm_enqueue_info *einfo,
2914 struct ptlrpc_request_set *rqset)
2916 struct ldlm_res_id res_id = { .name = {0} };
2917 struct obd_device *obd = exp->exp_obd;
2918 struct ldlm_reply *rep;
2919 struct ptlrpc_request *req = NULL;
2920 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2925 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2926 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2928 /* Filesystem lock extents are extended to page boundaries so that
2929 * dealing with the page cache is a little smoother. */
2930 oinfo->oi_policy.l_extent.start -=
2931 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2932 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2934 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2937 /* Next, search for already existing extent locks that will cover us */
2938 /* If we're trying to read, we also search for an existing PW lock. The
2939 * VFS and page cache already protect us locally, so lots of readers/
2940 * writers can share a single PW lock.
2942 * There are problems with conversion deadlocks, so instead of
2943 * converting a read lock to a write lock, we'll just enqueue a new
2946 * At some point we should cancel the read lock instead of making them
2947 * send us a blocking callback, but there are problems with canceling
2948 * locks out from other users right now, too. */
2949 mode = einfo->ei_mode;
2950 if (einfo->ei_mode == LCK_PR)
2952 mode = ldlm_lock_match(obd->obd_namespace,
2953 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2954 einfo->ei_type, &oinfo->oi_policy, mode,
2957 /* addref the lock only if not async requests and PW lock is
2958 * matched whereas we asked for PR. */
2959 if (!rqset && einfo->ei_mode != mode)
2960 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2961 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2964 /* I would like to be able to ASSERT here that rss <=
2965 * kms, but I can't, for reasons which are explained in
2969 /* We already have a lock, and it's referenced */
2970 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2972 /* For async requests, decref the lock. */
2973 if (einfo->ei_mode != mode)
2974 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2976 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2984 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2985 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2986 [DLM_LOCKREQ_OFF + 1] = 0 };
2988 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2992 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2993 size[DLM_REPLY_REC_OFF] =
2994 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2995 ptlrpc_req_set_repsize(req, 3, size);
2998 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2999 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3001 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3002 &oinfo->oi_policy, &oinfo->oi_flags,
3003 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3004 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3005 lustre_swab_ost_lvb, oinfo->oi_lockh,
3009 struct osc_enqueue_args *aa;
3010 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3011 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3016 req->rq_interpret_reply = osc_enqueue_interpret;
3017 ptlrpc_set_add_req(rqset, req);
3018 } else if (intent) {
3019 ptlrpc_req_finished(req);
3024 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3026 ptlrpc_req_finished(req);
3031 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3032 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3033 int *flags, void *data, struct lustre_handle *lockh)
3035 struct ldlm_res_id res_id = { .name = {0} };
3036 struct obd_device *obd = exp->exp_obd;
3037 int lflags = *flags;
3041 res_id.name[0] = lsm->lsm_object_id;
3042 res_id.name[2] = lsm->lsm_object_gr;
3044 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3047 /* Filesystem lock extents are extended to page boundaries so that
3048 * dealing with the page cache is a little smoother */
3049 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3050 policy->l_extent.end |= ~CFS_PAGE_MASK;
3052 /* Next, search for already existing extent locks that will cover us */
3053 /* If we're trying to read, we also search for an existing PW lock. The
3054 * VFS and page cache already protect us locally, so lots of readers/
3055 * writers can share a single PW lock. */
3059 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3060 &res_id, type, policy, rc, lockh);
3062 osc_set_data_with_check(lockh, data, lflags);
3063 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3064 ldlm_lock_addref(lockh, LCK_PR);
3065 ldlm_lock_decref(lockh, LCK_PW);
3072 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3073 __u32 mode, struct lustre_handle *lockh)
3077 if (unlikely(mode == LCK_GROUP))
3078 ldlm_lock_decref_and_cancel(lockh, mode);
3080 ldlm_lock_decref(lockh, mode);
3085 static int osc_cancel_unused(struct obd_export *exp,
3086 struct lov_stripe_md *lsm, int flags,
3089 struct obd_device *obd = class_exp2obd(exp);
3090 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3093 res_id.name[0] = lsm->lsm_object_id;
3094 res_id.name[2] = lsm->lsm_object_gr;
3098 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3101 static int osc_join_lru(struct obd_export *exp,
3102 struct lov_stripe_md *lsm, int join)
3104 struct obd_device *obd = class_exp2obd(exp);
3105 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3108 res_id.name[0] = lsm->lsm_object_id;
3109 res_id.name[2] = lsm->lsm_object_gr;
3113 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3116 static int osc_statfs_interpret(struct ptlrpc_request *req,
3117 struct osc_async_args *aa, int rc)
3119 struct obd_statfs *msfs;
3125 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3126 lustre_swab_obd_statfs);
3128 CERROR("Can't unpack obd_statfs\n");
3129 GOTO(out, rc = -EPROTO);
3132 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3134 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3138 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3139 __u64 max_age, struct ptlrpc_request_set *rqset)
3141 struct ptlrpc_request *req;
3142 struct osc_async_args *aa;
3143 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3146 /* We could possibly pass max_age in the request (as an absolute
3147 * timestamp or a "seconds.usec ago") so the target can avoid doing
3148 * extra calls into the filesystem if that isn't necessary (e.g.
3149 * during mount that would help a bit). Having relative timestamps
3150 * is not so great if request processing is slow, while absolute
3151 * timestamps are not ideal because they need time synchronization. */
3152 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3153 OST_STATFS, 1, NULL, NULL);
3157 ptlrpc_req_set_repsize(req, 2, size);
3158 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3160 req->rq_interpret_reply = osc_statfs_interpret;
3161 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3162 aa = (struct osc_async_args *)&req->rq_async_args;
3165 ptlrpc_set_add_req(rqset, req);
3169 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3172 struct obd_statfs *msfs;
3173 struct ptlrpc_request *req;
3174 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3177 /* We could possibly pass max_age in the request (as an absolute
3178 * timestamp or a "seconds.usec ago") so the target can avoid doing
3179 * extra calls into the filesystem if that isn't necessary (e.g.
3180 * during mount that would help a bit). Having relative timestamps
3181 * is not so great if request processing is slow, while absolute
3182 * timestamps are not ideal because they need time synchronization. */
3183 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3184 OST_STATFS, 1, NULL, NULL);
3188 ptlrpc_req_set_repsize(req, 2, size);
3189 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3191 rc = ptlrpc_queue_wait(req);
3195 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3196 lustre_swab_obd_statfs);
3198 CERROR("Can't unpack obd_statfs\n");
3199 GOTO(out, rc = -EPROTO);
3202 memcpy(osfs, msfs, sizeof(*osfs));
3206 ptlrpc_req_finished(req);
3210 /* Retrieve object striping information.
3212 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3213 * the maximum number of OST indices which will fit in the user buffer.
3214 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3216 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3218 struct lov_user_md lum, *lumk;
3219 int rc = 0, lum_size;
3225 if (copy_from_user(&lum, lump, sizeof(lum)))
3228 if (lum.lmm_magic != LOV_USER_MAGIC)
3231 if (lum.lmm_stripe_count > 0) {
3232 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3233 OBD_ALLOC(lumk, lum_size);
3237 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3238 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3240 lum_size = sizeof(lum);
3244 lumk->lmm_object_id = lsm->lsm_object_id;
3245 lumk->lmm_object_gr = lsm->lsm_object_gr;
3246 lumk->lmm_stripe_count = 1;
3248 if (copy_to_user(lump, lumk, lum_size))
3252 OBD_FREE(lumk, lum_size);
3258 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3259 void *karg, void *uarg)
3261 struct obd_device *obd = exp->exp_obd;
3262 struct obd_ioctl_data *data = karg;
3266 if (!try_module_get(THIS_MODULE)) {
3267 CERROR("Can't get module. Is it alive?");
3271 case OBD_IOC_LOV_GET_CONFIG: {
3273 struct lov_desc *desc;
3274 struct obd_uuid uuid;
3278 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3279 GOTO(out, err = -EINVAL);
3281 data = (struct obd_ioctl_data *)buf;
3283 if (sizeof(*desc) > data->ioc_inllen1) {
3284 obd_ioctl_freedata(buf, len);
3285 GOTO(out, err = -EINVAL);
3288 if (data->ioc_inllen2 < sizeof(uuid)) {
3289 obd_ioctl_freedata(buf, len);
3290 GOTO(out, err = -EINVAL);
3293 desc = (struct lov_desc *)data->ioc_inlbuf1;
3294 desc->ld_tgt_count = 1;
3295 desc->ld_active_tgt_count = 1;
3296 desc->ld_default_stripe_count = 1;
3297 desc->ld_default_stripe_size = 0;
3298 desc->ld_default_stripe_offset = 0;
3299 desc->ld_pattern = 0;
3300 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3302 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3304 err = copy_to_user((void *)uarg, buf, len);
3307 obd_ioctl_freedata(buf, len);
3310 case LL_IOC_LOV_SETSTRIPE:
3311 err = obd_alloc_memmd(exp, karg);
3315 case LL_IOC_LOV_GETSTRIPE:
3316 err = osc_getstripe(karg, uarg);
3318 case OBD_IOC_CLIENT_RECOVER:
3319 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3324 case IOC_OSC_SET_ACTIVE:
3325 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3328 case OBD_IOC_POLL_QUOTACHECK:
3329 err = lquota_poll_check(quota_interface, exp,
3330 (struct if_quotacheck *)karg);
3333 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3334 cmd, cfs_curproc_comm());
3335 GOTO(out, err = -ENOTTY);
3338 module_put(THIS_MODULE);
3342 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3343 void *key, __u32 *vallen, void *val)
3346 if (!vallen || !val)
3349 if (KEY_IS("lock_to_stripe")) {
3350 __u32 *stripe = val;
3351 *vallen = sizeof(*stripe);
3354 } else if (KEY_IS("last_id")) {
3355 struct ptlrpc_request *req;
3357 char *bufs[2] = { NULL, key };
3358 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3360 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3361 OST_GET_INFO, 2, size, bufs);
3365 size[REPLY_REC_OFF] = *vallen;
3366 ptlrpc_req_set_repsize(req, 2, size);
3367 rc = ptlrpc_queue_wait(req);
3371 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3372 lustre_swab_ost_last_id);
3373 if (reply == NULL) {
3374 CERROR("Can't unpack OST last ID\n");
3375 GOTO(out, rc = -EPROTO);
3377 *((obd_id *)val) = *reply;
3379 ptlrpc_req_finished(req);
3385 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3388 struct llog_ctxt *ctxt;
3389 struct obd_import *imp = req->rq_import;
3395 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3398 rc = llog_initiator_connect(ctxt);
3400 CERROR("cannot establish connection for "
3401 "ctxt %p: %d\n", ctxt, rc);
3404 spin_lock(&imp->imp_lock);
3405 imp->imp_server_timeout = 1;
3406 imp->imp_pingable = 1;
3407 spin_unlock(&imp->imp_lock);
3408 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3413 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3414 void *key, obd_count vallen, void *val,
3415 struct ptlrpc_request_set *set)
3417 struct ptlrpc_request *req;
3418 struct obd_device *obd = exp->exp_obd;
3419 struct obd_import *imp = class_exp2cliimp(exp);
3420 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3421 char *bufs[3] = { NULL, key, val };
3424 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3426 if (KEY_IS(KEY_NEXT_ID)) {
3427 if (vallen != sizeof(obd_id))
3429 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3430 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3431 exp->exp_obd->obd_name,
3432 obd->u.cli.cl_oscc.oscc_next_id);
3437 if (KEY_IS("unlinked")) {
3438 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3439 spin_lock(&oscc->oscc_lock);
3440 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3441 spin_unlock(&oscc->oscc_lock);
3445 if (KEY_IS(KEY_INIT_RECOV)) {
3446 if (vallen != sizeof(int))
3448 spin_lock(&imp->imp_lock);
3449 imp->imp_initial_recov = *(int *)val;
3450 spin_unlock(&imp->imp_lock);
3451 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3452 exp->exp_obd->obd_name,
3453 imp->imp_initial_recov);
3457 if (KEY_IS("checksum")) {
3458 if (vallen != sizeof(int))
3460 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3464 if (KEY_IS(KEY_FLUSH_CTX)) {
3465 sptlrpc_import_flush_my_ctx(imp);
3472 /* We pass all other commands directly to OST. Since nobody calls osc
3473 methods directly and everybody is supposed to go through LOV, we
3474 assume lov checked invalid values for us.
3475 The only recognised values so far are evict_by_nid and mds_conn.
3476 Even if something bad goes through, we'd get a -EINVAL from OST
3479 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3484 if (KEY_IS(KEY_MDS_CONN)) {
3485 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3487 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3488 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3489 LASSERT(oscc->oscc_oa.o_gr > 0);
3490 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3493 ptlrpc_req_set_repsize(req, 1, NULL);
3494 ptlrpc_set_add_req(set, req);
3495 ptlrpc_check_set(set);
3501 static struct llog_operations osc_size_repl_logops = {
3502 lop_cancel: llog_obd_repl_cancel
3505 static struct llog_operations osc_mds_ost_orig_logops;
3506 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3507 struct obd_device *tgt, int count,
3508 struct llog_catid *catid, struct obd_uuid *uuid)
3513 spin_lock(&obd->obd_dev_lock);
3514 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3515 osc_mds_ost_orig_logops = llog_lvfs_ops;
3516 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3517 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3518 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3519 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3521 spin_unlock(&obd->obd_dev_lock);
3523 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3524 &catid->lci_logid, &osc_mds_ost_orig_logops);
3526 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3530 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3531 &osc_size_repl_logops);
3533 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3536 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3537 obd->obd_name, tgt->obd_name, count, catid, rc);
3538 CERROR("logid "LPX64":0x%x\n",
3539 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3544 static int osc_llog_finish(struct obd_device *obd, int count)
3546 struct llog_ctxt *ctxt;
3547 int rc = 0, rc2 = 0;
3550 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3552 rc = llog_cleanup(ctxt);
3554 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3556 rc2 = llog_cleanup(ctxt);
3563 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3564 struct obd_uuid *cluuid,
3565 struct obd_connect_data *data)
3567 struct client_obd *cli = &obd->u.cli;
3569 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3572 client_obd_list_lock(&cli->cl_loi_list_lock);
3573 data->ocd_grant = cli->cl_avail_grant ?:
3574 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3575 lost_grant = cli->cl_lost_grant;
3576 cli->cl_lost_grant = 0;
3577 client_obd_list_unlock(&cli->cl_loi_list_lock);
3579 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3580 "cl_lost_grant: %ld\n", data->ocd_grant,
3581 cli->cl_avail_grant, lost_grant);
3582 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3583 " ocd_grant: %d\n", data->ocd_connect_flags,
3584 data->ocd_version, data->ocd_grant);
3590 static int osc_disconnect(struct obd_export *exp)
3592 struct obd_device *obd = class_exp2obd(exp);
3593 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3596 if (obd->u.cli.cl_conn_count == 1)
3597 /* flush any remaining cancel messages out to the target */
3598 llog_sync(ctxt, exp);
3600 rc = client_disconnect_export(exp);
3604 static int osc_import_event(struct obd_device *obd,
3605 struct obd_import *imp,
3606 enum obd_import_event event)
3608 struct client_obd *cli;
3612 LASSERT(imp->imp_obd == obd);
3615 case IMP_EVENT_DISCON: {
3616 /* Only do this on the MDS OSC's */
3617 if (imp->imp_server_timeout) {
3618 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3620 spin_lock(&oscc->oscc_lock);
3621 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3622 spin_unlock(&oscc->oscc_lock);
3625 client_obd_list_lock(&cli->cl_loi_list_lock);
3626 cli->cl_avail_grant = 0;
3627 cli->cl_lost_grant = 0;
3628 client_obd_list_unlock(&cli->cl_loi_list_lock);
3631 case IMP_EVENT_INACTIVE: {
3632 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3635 case IMP_EVENT_INVALIDATE: {
3636 struct ldlm_namespace *ns = obd->obd_namespace;
3640 client_obd_list_lock(&cli->cl_loi_list_lock);
3641 /* all pages go to failing rpcs due to the invalid import */
3642 osc_check_rpcs(cli);
3643 client_obd_list_unlock(&cli->cl_loi_list_lock);
3645 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3649 case IMP_EVENT_ACTIVE: {
3650 /* Only do this on the MDS OSC's */
3651 if (imp->imp_server_timeout) {
3652 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3654 spin_lock(&oscc->oscc_lock);
3655 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3656 spin_unlock(&oscc->oscc_lock);
3658 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3661 case IMP_EVENT_OCD: {
3662 struct obd_connect_data *ocd = &imp->imp_connect_data;
3664 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3665 osc_init_grant(&obd->u.cli, ocd);
3668 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3669 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3671 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3675 CERROR("Unknown import event %d\n", event);
3681 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3687 rc = ptlrpcd_addref();
3691 rc = client_obd_setup(obd, lcfg);
3695 struct lprocfs_static_vars lvars = { 0 };
3696 struct client_obd *cli = &obd->u.cli;
3698 lprocfs_osc_init_vars(&lvars);
3699 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3700 lproc_osc_attach_seqstat(obd);
3701 ptlrpc_lprocfs_register_obd(obd);
3705 /* We need to allocate a few requests more, because
3706 brw_interpret_oap tries to create new requests before freeing
3707 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3708 reserved, but I afraid that might be too much wasted RAM
3709 in fact, so 2 is just my guess and still should work. */
3710 cli->cl_import->imp_rq_pool =
3711 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3713 ptlrpc_add_rqs_to_pool);
3719 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3725 case OBD_CLEANUP_EARLY: {
3726 struct obd_import *imp;
3727 imp = obd->u.cli.cl_import;
3728 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3729 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3730 ptlrpc_deactivate_import(imp);
3731 spin_lock(&imp->imp_lock);
3732 imp->imp_pingable = 0;
3733 spin_unlock(&imp->imp_lock);
3736 case OBD_CLEANUP_EXPORTS: {
3737 /* If we set up but never connected, the
3738 client import will not have been cleaned. */
3739 if (obd->u.cli.cl_import) {
3740 struct obd_import *imp;
3741 imp = obd->u.cli.cl_import;
3742 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3744 ptlrpc_invalidate_import(imp);
3745 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3746 class_destroy_import(imp);
3747 obd->u.cli.cl_import = NULL;
3751 case OBD_CLEANUP_SELF_EXP:
3752 rc = obd_llog_finish(obd, 0);
3754 CERROR("failed to cleanup llogging subsystems\n");
3756 case OBD_CLEANUP_OBD:
3762 int osc_cleanup(struct obd_device *obd)
3764 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3768 ptlrpc_lprocfs_unregister_obd(obd);
3769 lprocfs_obd_cleanup(obd);
3771 spin_lock(&oscc->oscc_lock);
3772 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3773 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3774 spin_unlock(&oscc->oscc_lock);
3776 /* free memory of osc quota cache */
3777 lquota_cleanup(quota_interface, obd);
3779 rc = client_obd_cleanup(obd);
3785 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3787 struct lustre_cfg *lcfg = buf;
3788 struct lprocfs_static_vars lvars = { 0 };
3791 lprocfs_osc_init_vars(&lvars);
3793 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3797 struct obd_ops osc_obd_ops = {
3798 .o_owner = THIS_MODULE,
3799 .o_setup = osc_setup,
3800 .o_precleanup = osc_precleanup,
3801 .o_cleanup = osc_cleanup,
3802 .o_add_conn = client_import_add_conn,
3803 .o_del_conn = client_import_del_conn,
3804 .o_connect = client_connect_import,
3805 .o_reconnect = osc_reconnect,
3806 .o_disconnect = osc_disconnect,
3807 .o_statfs = osc_statfs,
3808 .o_statfs_async = osc_statfs_async,
3809 .o_packmd = osc_packmd,
3810 .o_unpackmd = osc_unpackmd,
3811 .o_precreate = osc_precreate,
3812 .o_create = osc_create,
3813 .o_destroy = osc_destroy,
3814 .o_getattr = osc_getattr,
3815 .o_getattr_async = osc_getattr_async,
3816 .o_setattr = osc_setattr,
3817 .o_setattr_async = osc_setattr_async,
3819 .o_brw_async = osc_brw_async,
3820 .o_prep_async_page = osc_prep_async_page,
3821 .o_queue_async_io = osc_queue_async_io,
3822 .o_set_async_flags = osc_set_async_flags,
3823 .o_queue_group_io = osc_queue_group_io,
3824 .o_trigger_group_io = osc_trigger_group_io,
3825 .o_teardown_async_page = osc_teardown_async_page,
3826 .o_punch = osc_punch,
3828 .o_enqueue = osc_enqueue,
3829 .o_match = osc_match,
3830 .o_change_cbdata = osc_change_cbdata,
3831 .o_cancel = osc_cancel,
3832 .o_cancel_unused = osc_cancel_unused,
3833 .o_join_lru = osc_join_lru,
3834 .o_iocontrol = osc_iocontrol,
3835 .o_get_info = osc_get_info,
3836 .o_set_info_async = osc_set_info_async,
3837 .o_import_event = osc_import_event,
3838 .o_llog_init = osc_llog_init,
3839 .o_llog_finish = osc_llog_finish,
3840 .o_process_config = osc_process_config,
3842 int __init osc_init(void)
3844 struct lprocfs_static_vars lvars = { 0 };
3848 lprocfs_osc_init_vars(&lvars);
3850 request_module("lquota");
3851 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3852 lquota_init(quota_interface);
3853 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3855 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3856 LUSTRE_OSC_NAME, NULL);
3858 if (quota_interface)
3859 PORTAL_SYMBOL_PUT(osc_quota_interface);
3867 static void /*__exit*/ osc_exit(void)
3869 lquota_exit(quota_interface);
3870 if (quota_interface)
3871 PORTAL_SYMBOL_PUT(osc_quota_interface);
3873 class_unregister_type(LUSTRE_OSC_NAME);
3876 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3877 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3878 MODULE_LICENSE("GPL");
3880 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);