1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
144 /* XXX zero *lsmp? */
145 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147 LASSERT((*lsmp)->lsm_object_id);
148 LASSERT((*lsmp)->lsm_object_gr);
151 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157 struct ost_body *body, void *capa)
159 struct obd_capa *oc = (struct obd_capa *)capa;
160 struct lustre_capa *c;
165 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
168 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169 DEBUG_CAPA(D_SEC, c, "pack");
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173 struct obd_info *oinfo)
175 struct ost_body *body;
177 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178 body->oa = *oinfo->oi_oa;
179 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183 struct osc_async_args *aa, int rc)
185 struct ost_body *body;
191 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192 lustre_swab_ost_body);
194 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
197 /* This should really be sent by the OST */
198 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
201 CERROR("can't unpack ost_body\n");
203 aa->aa_oi->oi_oa->o_valid = 0;
206 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211 struct ptlrpc_request_set *set)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216 struct osc_async_args *aa;
219 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221 OST_GETATTR, 3, size,NULL);
225 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
227 ptlrpc_req_set_repsize(req, 2, size);
228 req->rq_interpret_reply = osc_getattr_interpret;
230 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231 aa = (struct osc_async_args *)&req->rq_async_args;
234 ptlrpc_set_add_req(set, req);
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
240 struct ptlrpc_request *req;
241 struct ost_body *body;
242 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
245 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247 OST_GETATTR, 3, size, NULL);
251 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
253 ptlrpc_req_set_repsize(req, 2, size);
255 rc = ptlrpc_queue_wait(req);
257 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262 lustre_swab_ost_body);
264 CERROR ("can't unpack ost_body\n");
265 GOTO (out, rc = -EPROTO);
268 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269 *oinfo->oi_oa = body->oa;
271 /* This should really be sent by the OST */
272 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277 ptlrpc_req_finished(req);
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282 struct obd_trans_info *oti)
284 struct ptlrpc_request *req;
285 struct ost_body *body;
286 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
289 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290 oinfo->oi_oa->o_gr > 0);
291 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293 OST_SETATTR, 3, size, NULL);
297 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
299 ptlrpc_req_set_repsize(req, 2, size);
301 rc = ptlrpc_queue_wait(req);
305 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306 lustre_swab_ost_body);
308 GOTO(out, rc = -EPROTO);
310 *oinfo->oi_oa = body->oa;
314 ptlrpc_req_finished(req);
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319 struct osc_async_args *aa, int rc)
321 struct ost_body *body;
327 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328 lustre_swab_ost_body);
330 CERROR("can't unpack ost_body\n");
331 GOTO(out, rc = -EPROTO);
334 *aa->aa_oi->oi_oa = body->oa;
336 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341 struct obd_trans_info *oti,
342 struct ptlrpc_request_set *rqset)
344 struct ptlrpc_request *req;
345 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346 struct osc_async_args *aa;
349 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351 OST_SETATTR, 3, size, NULL);
355 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
358 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
361 ptlrpc_req_set_repsize(req, 2, size);
362 /* do mds to ost setattr asynchronouly */
364 /* Do not wait for response. */
365 ptlrpcd_add_req(req);
367 req->rq_interpret_reply = osc_setattr_interpret;
369 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370 aa = (struct osc_async_args *)&req->rq_async_args;
373 ptlrpc_set_add_req(rqset, req);
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380 struct lov_stripe_md **ea, struct obd_trans_info *oti)
382 struct ptlrpc_request *req;
383 struct ost_body *body;
384 struct lov_stripe_md *lsm;
385 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
393 rc = obd_alloc_memmd(exp, &lsm);
398 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399 OST_CREATE, 2, size, NULL);
401 GOTO(out, rc = -ENOMEM);
403 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
406 ptlrpc_req_set_repsize(req, 2, size);
407 if (oa->o_valid & OBD_MD_FLINLINE) {
408 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409 oa->o_flags == OBD_FL_DELORPHAN);
411 "delorphan from OST integration");
412 /* Don't resend the delorphan req */
413 req->rq_no_resend = req->rq_no_delay = 1;
416 rc = ptlrpc_queue_wait(req);
420 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421 lustre_swab_ost_body);
423 CERROR ("can't unpack ost_body\n");
424 GOTO (out_req, rc = -EPROTO);
429 /* This should really be sent by the OST */
430 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431 oa->o_valid |= OBD_MD_FLBLKSZ;
433 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434 * have valid lsm_oinfo data structs, so don't go touching that.
435 * This needs to be fixed in a big way.
437 lsm->lsm_object_id = oa->o_id;
438 lsm->lsm_object_gr = oa->o_gr;
442 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
444 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445 if (!oti->oti_logcookies)
446 oti_alloc_cookies(oti, 1);
447 *oti->oti_logcookies = *obdo_logcookie(oa);
451 CDEBUG(D_HA, "transno: "LPD64"\n",
452 lustre_msg_get_transno(req->rq_repmsg));
454 ptlrpc_req_finished(req);
457 obd_free_memmd(exp, &lsm);
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462 struct osc_async_args *aa, int rc)
464 struct ost_body *body;
470 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471 lustre_swab_ost_body);
473 CERROR ("can't unpack ost_body\n");
474 GOTO(out, rc = -EPROTO);
477 *aa->aa_oi->oi_oa = body->oa;
479 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484 struct obd_trans_info *oti,
485 struct ptlrpc_request_set *rqset)
487 struct ptlrpc_request *req;
488 struct osc_async_args *aa;
489 struct ost_body *body;
490 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
498 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500 OST_PUNCH, 3, size, NULL);
504 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
506 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
507 /* overload the size and blocks fields in the oa with start/end */
508 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
509 body->oa.o_size = oinfo->oi_policy.l_extent.start;
510 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
511 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
513 ptlrpc_req_set_repsize(req, 2, size);
515 req->rq_interpret_reply = osc_punch_interpret;
516 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
517 aa = (struct osc_async_args *)&req->rq_async_args;
519 ptlrpc_set_add_req(rqset, req);
524 static int osc_sync(struct obd_export *exp, struct obdo *oa,
525 struct lov_stripe_md *md, obd_size start, obd_size end,
528 struct ptlrpc_request *req;
529 struct ost_body *body;
530 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
538 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
540 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
541 OST_SYNC, 3, size, NULL);
545 /* overload the size and blocks fields in the oa with start/end */
546 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
548 body->oa.o_size = start;
549 body->oa.o_blocks = end;
550 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
552 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
554 ptlrpc_req_set_repsize(req, 2, size);
556 rc = ptlrpc_queue_wait(req);
560 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
561 lustre_swab_ost_body);
563 CERROR ("can't unpack ost_body\n");
564 GOTO (out, rc = -EPROTO);
571 ptlrpc_req_finished(req);
575 /* Find and cancel locally locks matched by @mode in the resource found by
576 * @objid. Found locks are added into @cancel list. Returns the amount of
577 * locks added to @cancels list. */
578 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
579 struct list_head *cancels, ldlm_mode_t mode,
582 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
583 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
584 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
591 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
592 lock_flags, 0, NULL);
593 ldlm_resource_putref(res);
597 /* Destroy requests can be async always on the client, and we don't even really
598 * care about the return code since the client cannot do anything at all about
600 * When the MDS is unlinking a filename, it saves the file objects into a
601 * recovery llog, and these object records are cancelled when the OST reports
602 * they were destroyed and sync'd to disk (i.e. transaction committed).
603 * If the client dies, or the OST is down when the object should be destroyed,
604 * the records are not cancelled, and when the OST reconnects to the MDS next,
605 * it will retrieve the llog unlink logs and then sends the log cancellation
606 * cookies to the MDS after committing destroy transactions. */
607 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
608 struct lov_stripe_md *ea, struct obd_trans_info *oti,
609 struct obd_export *md_export)
611 CFS_LIST_HEAD(cancels);
612 struct ptlrpc_request *req;
613 struct ost_body *body;
614 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
615 int count, bufcount = 2;
623 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
624 LDLM_FL_DISCARD_DATA);
625 if (exp_connect_cancelset(exp) && count)
627 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
628 size, REQ_REC_OFF + 1, 0, &cancels, count);
632 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
635 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
636 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
637 sizeof(*oti->oti_logcookies));
640 ptlrpc_req_set_repsize(req, 2, size);
642 ptlrpcd_add_req(req);
646 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
649 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
651 LASSERT(!(oa->o_valid & bits));
654 client_obd_list_lock(&cli->cl_loi_list_lock);
655 oa->o_dirty = cli->cl_dirty;
656 if (cli->cl_dirty > cli->cl_dirty_max) {
657 CERROR("dirty %lu > dirty_max %lu\n",
658 cli->cl_dirty, cli->cl_dirty_max);
660 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
661 CERROR("dirty %d > system dirty_max %d\n",
662 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
664 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
665 CERROR("dirty %lu - dirty_max %lu too big???\n",
666 cli->cl_dirty, cli->cl_dirty_max);
669 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
670 (cli->cl_max_rpcs_in_flight + 1);
671 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
673 oa->o_grant = cli->cl_avail_grant;
674 oa->o_dropped = cli->cl_lost_grant;
675 cli->cl_lost_grant = 0;
676 client_obd_list_unlock(&cli->cl_loi_list_lock);
677 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
678 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
681 /* caller must hold loi_list_lock */
682 static void osc_consume_write_grant(struct client_obd *cli,
683 struct brw_page *pga)
685 atomic_inc(&obd_dirty_pages);
686 cli->cl_dirty += CFS_PAGE_SIZE;
687 cli->cl_avail_grant -= CFS_PAGE_SIZE;
688 pga->flag |= OBD_BRW_FROM_GRANT;
689 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
690 CFS_PAGE_SIZE, pga, pga->pg);
691 LASSERT(cli->cl_avail_grant >= 0);
694 /* the companion to osc_consume_write_grant, called when a brw has completed.
695 * must be called with the loi lock held. */
696 static void osc_release_write_grant(struct client_obd *cli,
697 struct brw_page *pga, int sent)
699 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
702 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
707 pga->flag &= ~OBD_BRW_FROM_GRANT;
708 atomic_dec(&obd_dirty_pages);
709 cli->cl_dirty -= CFS_PAGE_SIZE;
711 cli->cl_lost_grant += CFS_PAGE_SIZE;
712 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
713 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
714 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
715 /* For short writes we shouldn't count parts of pages that
716 * span a whole block on the OST side, or our accounting goes
717 * wrong. Should match the code in filter_grant_check. */
718 int offset = pga->off & ~CFS_PAGE_MASK;
719 int count = pga->count + (offset & (blocksize - 1));
720 int end = (offset + pga->count) & (blocksize - 1);
722 count += blocksize - end;
724 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
725 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
726 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
727 cli->cl_avail_grant, cli->cl_dirty);
733 static unsigned long rpcs_in_flight(struct client_obd *cli)
735 return cli->cl_r_in_flight + cli->cl_w_in_flight;
738 /* caller must hold loi_list_lock */
739 void osc_wake_cache_waiters(struct client_obd *cli)
741 struct list_head *l, *tmp;
742 struct osc_cache_waiter *ocw;
745 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
746 /* if we can't dirty more, we must wait until some is written */
747 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
748 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
749 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
750 "osc max %ld, sys max %d\n", cli->cl_dirty,
751 cli->cl_dirty_max, obd_max_dirty_pages);
755 /* if still dirty cache but no grant wait for pending RPCs that
756 * may yet return us some grant before doing sync writes */
757 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
758 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
759 cli->cl_w_in_flight);
763 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
764 list_del_init(&ocw->ocw_entry);
765 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
766 /* no more RPCs in flight to return grant, do sync IO */
767 ocw->ocw_rc = -EDQUOT;
768 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
770 osc_consume_write_grant(cli,
771 &ocw->ocw_oap->oap_brw_page);
774 cfs_waitq_signal(&ocw->ocw_waitq);
780 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
782 client_obd_list_lock(&cli->cl_loi_list_lock);
783 cli->cl_avail_grant = ocd->ocd_grant;
784 client_obd_list_unlock(&cli->cl_loi_list_lock);
786 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
787 cli->cl_avail_grant, cli->cl_lost_grant);
788 LASSERT(cli->cl_avail_grant >= 0);
791 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
793 client_obd_list_lock(&cli->cl_loi_list_lock);
794 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
795 if (body->oa.o_valid & OBD_MD_FLGRANT)
796 cli->cl_avail_grant += body->oa.o_grant;
797 /* waiters are woken in brw_interpret_oap */
798 client_obd_list_unlock(&cli->cl_loi_list_lock);
801 /* We assume that the reason this OSC got a short read is because it read
802 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
803 * via the LOV, and it _knows_ it's reading inside the file, it's just that
804 * this stripe never got written at or beyond this stripe offset yet. */
805 static void handle_short_read(int nob_read, obd_count page_count,
806 struct brw_page **pga)
811 /* skip bytes read OK */
812 while (nob_read > 0) {
813 LASSERT (page_count > 0);
815 if (pga[i]->count > nob_read) {
816 /* EOF inside this page */
817 ptr = cfs_kmap(pga[i]->pg) +
818 (pga[i]->off & ~CFS_PAGE_MASK);
819 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
820 cfs_kunmap(pga[i]->pg);
826 nob_read -= pga[i]->count;
831 /* zero remaining pages */
832 while (page_count-- > 0) {
833 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
834 memset(ptr, 0, pga[i]->count);
835 cfs_kunmap(pga[i]->pg);
840 static int check_write_rcs(struct ptlrpc_request *req,
841 int requested_nob, int niocount,
842 obd_count page_count, struct brw_page **pga)
846 /* return error if any niobuf was in error */
847 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
848 sizeof(*remote_rcs) * niocount, NULL);
849 if (remote_rcs == NULL) {
850 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
853 if (lustre_msg_swabbed(req->rq_repmsg))
854 for (i = 0; i < niocount; i++)
855 __swab32s(&remote_rcs[i]);
857 for (i = 0; i < niocount; i++) {
858 if (remote_rcs[i] < 0)
859 return(remote_rcs[i]);
861 if (remote_rcs[i] != 0) {
862 CERROR("rc[%d] invalid (%d) req %p\n",
863 i, remote_rcs[i], req);
868 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
869 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
870 requested_nob, req->rq_bulk->bd_nob_transferred);
877 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
879 if (p1->flag != p2->flag) {
880 unsigned mask = ~OBD_BRW_FROM_GRANT;
882 /* warn if we try to combine flags that we don't know to be
884 if ((p1->flag & mask) != (p2->flag & mask))
885 CERROR("is it ok to have flags 0x%x and 0x%x in the "
886 "same brw?\n", p1->flag, p2->flag);
890 return (p1->off + p1->count == p2->off);
893 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
894 struct brw_page **pga, int opc)
899 LASSERT (pg_count > 0);
900 while (nob > 0 && pg_count > 0) {
901 char *ptr = cfs_kmap(pga[i]->pg);
902 int off = pga[i]->off & ~CFS_PAGE_MASK;
903 int count = pga[i]->count > nob ? nob : pga[i]->count;
905 /* corrupt the data before we compute the checksum, to
906 * simulate an OST->client data error */
907 if (i == 0 && opc == OST_READ &&
908 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
909 memcpy(ptr + off, "bad1", min(4, nob));
910 cksum = crc32_le(cksum, ptr + off, count);
911 cfs_kunmap(pga[i]->pg);
912 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
915 nob -= pga[i]->count;
919 /* For sending we only compute the wrong checksum instead
920 * of corrupting the data so it is still correct on a redo */
921 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
927 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
928 struct lov_stripe_md *lsm, obd_count page_count,
929 struct brw_page **pga,
930 struct ptlrpc_request **reqp,
931 struct obd_capa *ocapa)
933 struct ptlrpc_request *req;
934 struct ptlrpc_bulk_desc *desc;
935 struct ost_body *body;
936 struct obd_ioobj *ioobj;
937 struct niobuf_remote *niobuf;
938 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
939 int niocount, i, requested_nob, opc, rc;
940 struct ptlrpc_request_pool *pool;
941 struct lustre_capa *capa;
942 struct osc_brw_async_args *aa;
945 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
946 RETURN(-ENOMEM); /* Recoverable */
947 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
948 RETURN(-EINVAL); /* Fatal */
950 if ((cmd & OBD_BRW_WRITE) != 0) {
952 pool = cli->cl_import->imp_rq_pool;
958 for (niocount = i = 1; i < page_count; i++) {
959 if (!can_merge_pages(pga[i - 1], pga[i]))
963 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
964 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
966 size[REQ_REC_OFF + 3] = sizeof(*capa);
968 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
969 size, NULL, pool, NULL);
973 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
975 if (opc == OST_WRITE)
976 desc = ptlrpc_prep_bulk_imp (req, page_count,
977 BULK_GET_SOURCE, OST_BULK_PORTAL);
979 desc = ptlrpc_prep_bulk_imp (req, page_count,
980 BULK_PUT_SINK, OST_BULK_PORTAL);
982 GOTO(out, rc = -ENOMEM);
983 /* NB request now owns desc and will free it when it gets freed */
985 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
986 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
987 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
988 niocount * sizeof(*niobuf));
992 obdo_to_ioobj(oa, ioobj);
993 ioobj->ioo_bufcnt = niocount;
995 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
997 capa_cpy(capa, ocapa);
998 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1001 LASSERT (page_count > 0);
1002 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1003 struct brw_page *pg = pga[i];
1004 struct brw_page *pg_prev = pga[i - 1];
1006 LASSERT(pg->count > 0);
1007 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1008 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1009 pg->off, pg->count);
1011 LASSERTF(i == 0 || pg->off > pg_prev->off,
1012 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1013 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1015 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1016 pg_prev->pg, page_private(pg_prev->pg),
1017 pg_prev->pg->index, pg_prev->off);
1019 LASSERTF(i == 0 || pg->off > pg_prev->off,
1020 "i %d p_c %u\n", i, page_count);
1022 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1023 (pg->flag & OBD_BRW_SRVLOCK));
1025 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1027 requested_nob += pg->count;
1029 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1031 niobuf->len += pg->count;
1033 niobuf->offset = pg->off;
1034 niobuf->len = pg->count;
1035 niobuf->flags = pg->flag;
1039 LASSERT((void *)(niobuf - niocount) ==
1040 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1041 niocount * sizeof(*niobuf)));
1042 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1044 /* size[REQ_REC_OFF] still sizeof (*body) */
1045 if (opc == OST_WRITE) {
1046 if (unlikely(cli->cl_checksum)) {
1047 body->oa.o_valid |= OBD_MD_FLCKSUM;
1048 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1051 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1053 /* save this in 'oa', too, for later checking */
1054 oa->o_valid |= OBD_MD_FLCKSUM;
1056 /* clear out the checksum flag, in case this is a
1057 * resend but cl_checksum is no longer set. b=11238 */
1058 oa->o_valid &= ~OBD_MD_FLCKSUM;
1060 oa->o_cksum = body->oa.o_cksum;
1061 /* 1 RC per niobuf */
1062 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1063 ptlrpc_req_set_repsize(req, 3, size);
1065 if (unlikely(cli->cl_checksum))
1066 body->oa.o_valid |= OBD_MD_FLCKSUM;
1067 /* 1 RC for the whole I/O */
1068 ptlrpc_req_set_repsize(req, 2, size);
1071 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1072 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1074 aa->aa_requested_nob = requested_nob;
1075 aa->aa_nio_count = niocount;
1076 aa->aa_page_count = page_count;
1080 INIT_LIST_HEAD(&aa->aa_oaps);
1086 ptlrpc_req_finished (req);
1090 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1091 __u32 client_cksum, __u32 server_cksum,
1092 int nob, obd_count page_count,
1093 struct brw_page **pga)
1098 if (server_cksum == client_cksum) {
1099 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1103 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1105 if (new_cksum == server_cksum)
1106 msg = "changed on the client after we checksummed it - "
1107 "likely false positive due to mmap IO (bug 11742)";
1108 else if (new_cksum == client_cksum)
1109 msg = "changed in transit before arrival at OST";
1111 msg = "changed in transit AND doesn't match the original - "
1112 "likely false positive due to mmap IO (bug 11742)";
1114 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1115 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1116 "["LPU64"-"LPU64"]\n",
1117 msg, libcfs_nid2str(peer->nid),
1118 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1119 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1122 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1124 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1125 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1126 client_cksum, server_cksum, new_cksum);
1130 /* Note rc enters this function as number of bytes transferred */
1131 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1133 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1134 const lnet_process_id_t *peer =
1135 &req->rq_import->imp_connection->c_peer;
1136 struct client_obd *cli = aa->aa_cli;
1137 struct ost_body *body;
1138 __u32 client_cksum = 0;
1141 if (rc < 0 && rc != -EDQUOT)
1144 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1145 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1146 lustre_swab_ost_body);
1148 CERROR ("Can't unpack body\n");
1152 /* set/clear over quota flag for a uid/gid */
1153 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1154 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1155 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1156 body->oa.o_gid, body->oa.o_valid,
1162 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1163 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1165 osc_update_grant(cli, body);
1167 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1169 CERROR ("Unexpected +ve rc %d\n", rc);
1172 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1174 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1176 check_write_checksum(&body->oa, peer, client_cksum,
1178 aa->aa_requested_nob,
1183 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1186 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1187 aa->aa_page_count, aa->aa_ppga);
1191 /* The rest of this function executes only for OST_READs */
1192 if (rc > aa->aa_requested_nob) {
1193 CERROR("Unexpected rc %d (%d requested)\n", rc,
1194 aa->aa_requested_nob);
1198 if (rc != req->rq_bulk->bd_nob_transferred) {
1199 CERROR ("Unexpected rc %d (%d transferred)\n",
1200 rc, req->rq_bulk->bd_nob_transferred);
1204 if (rc < aa->aa_requested_nob)
1205 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1207 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1209 GOTO(out, rc = -EAGAIN);
1211 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1212 static int cksum_counter;
1213 __u32 server_cksum = body->oa.o_cksum;
1217 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1218 aa->aa_ppga, OST_READ);
1220 if (peer->nid == req->rq_bulk->bd_sender) {
1224 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1227 if (server_cksum == ~0 && rc > 0) {
1228 CERROR("Protocol error: server %s set the 'checksum' "
1229 "bit, but didn't send a checksum. Not fatal, "
1230 "but please tell CFS.\n",
1231 libcfs_nid2str(peer->nid));
1232 } else if (server_cksum != client_cksum) {
1233 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1234 "%s%s%s inum "LPU64"/"LPU64" object "
1235 LPU64"/"LPU64" extent "
1236 "["LPU64"-"LPU64"]\n",
1237 req->rq_import->imp_obd->obd_name,
1238 libcfs_nid2str(peer->nid),
1240 body->oa.o_valid & OBD_MD_FLFID ?
1241 body->oa.o_fid : (__u64)0,
1242 body->oa.o_valid & OBD_MD_FLFID ?
1243 body->oa.o_generation :(__u64)0,
1245 body->oa.o_valid & OBD_MD_FLGROUP ?
1246 body->oa.o_gr : (__u64)0,
1247 aa->aa_ppga[0]->off,
1248 aa->aa_ppga[aa->aa_page_count-1]->off +
1249 aa->aa_ppga[aa->aa_page_count-1]->count -
1251 CERROR("client %x, server %x\n",
1252 client_cksum, server_cksum);
1254 aa->aa_oa->o_cksum = client_cksum;
1258 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1261 } else if (unlikely(client_cksum)) {
1262 static int cksum_missed;
1265 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1266 CERROR("Checksum %u requested from %s but not sent\n",
1267 cksum_missed, libcfs_nid2str(peer->nid));
1273 *aa->aa_oa = body->oa;
1278 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1279 struct lov_stripe_md *lsm,
1280 obd_count page_count, struct brw_page **pga,
1281 struct obd_capa *ocapa)
1283 struct ptlrpc_request *req;
1287 struct l_wait_info lwi;
1291 cfs_waitq_init(&waitq);
1294 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1295 page_count, pga, &req, ocapa);
1299 rc = ptlrpc_queue_wait(req);
1301 if (rc == -ETIMEDOUT && req->rq_resend) {
1302 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1303 ptlrpc_req_finished(req);
1307 rc = osc_brw_fini_request(req, rc);
1309 ptlrpc_req_finished(req);
1310 if (osc_recoverable_error(rc)) {
1312 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1313 CERROR("too many resend retries, returning error\n");
1317 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1318 l_wait_event(waitq, 0, &lwi);
1326 int osc_brw_redo_request(struct ptlrpc_request *request,
1327 struct osc_brw_async_args *aa)
1329 struct ptlrpc_request *new_req;
1330 struct ptlrpc_request_set *set = request->rq_set;
1331 struct osc_brw_async_args *new_aa;
1332 struct osc_async_page *oap;
1336 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1337 CERROR("too many resend retries, returning error\n");
1341 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1343 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1344 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1345 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1348 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1349 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1350 aa->aa_cli, aa->aa_oa,
1351 NULL /* lsm unused by osc currently */,
1352 aa->aa_page_count, aa->aa_ppga,
1353 &new_req, NULL /* ocapa */);
1357 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1359 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1360 if (oap->oap_request != NULL) {
1361 LASSERTF(request == oap->oap_request,
1362 "request %p != oap_request %p\n",
1363 request, oap->oap_request);
1364 if (oap->oap_interrupted) {
1365 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1366 ptlrpc_req_finished(new_req);
1371 /* New request takes over pga and oaps from old request.
1372 * Note that copying a list_head doesn't work, need to move it... */
1374 new_req->rq_interpret_reply = request->rq_interpret_reply;
1375 new_req->rq_async_args = request->rq_async_args;
1376 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1378 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1380 INIT_LIST_HEAD(&new_aa->aa_oaps);
1381 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1382 INIT_LIST_HEAD(&aa->aa_oaps);
1384 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1385 if (oap->oap_request) {
1386 ptlrpc_req_finished(oap->oap_request);
1387 oap->oap_request = ptlrpc_request_addref(new_req);
1391 /* use ptlrpc_set_add_req is safe because interpret functions work
1392 * in check_set context. only one way exist with access to request
1393 * from different thread got -EINTR - this way protected with
1394 * cl_loi_list_lock */
1395 ptlrpc_set_add_req(set, new_req);
1397 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1399 DEBUG_REQ(D_INFO, new_req, "new request");
1403 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1405 struct osc_brw_async_args *aa = data;
1410 rc = osc_brw_fini_request(req, rc);
1411 if (osc_recoverable_error(rc)) {
1412 rc = osc_brw_redo_request(req, aa);
1416 if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1417 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1419 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1420 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1421 aa->aa_cli->cl_w_in_flight--;
1423 aa->aa_cli->cl_r_in_flight--;
1424 for (i = 0; i < aa->aa_page_count; i++)
1425 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1426 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1428 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1433 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1434 struct lov_stripe_md *lsm, obd_count page_count,
1435 struct brw_page **pga, struct ptlrpc_request_set *set,
1436 struct obd_capa *ocapa)
1438 struct ptlrpc_request *req;
1439 struct client_obd *cli = &exp->exp_obd->u.cli;
1441 struct osc_brw_async_args *aa;
1444 /* Consume write credits even if doing a sync write -
1445 * otherwise we may run out of space on OST due to grant. */
1446 if (cmd == OBD_BRW_WRITE) {
1447 spin_lock(&cli->cl_loi_list_lock);
1448 for (i = 0; i < page_count; i++) {
1449 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1450 osc_consume_write_grant(cli, pga[i]);
1452 spin_unlock(&cli->cl_loi_list_lock);
1455 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1458 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1459 if (cmd == OBD_BRW_READ) {
1460 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1461 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1462 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1464 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1465 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1466 cli->cl_w_in_flight);
1467 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1471 req->rq_interpret_reply = brw_interpret;
1472 ptlrpc_set_add_req(set, req);
1473 client_obd_list_lock(&cli->cl_loi_list_lock);
1474 if (cmd == OBD_BRW_READ)
1475 cli->cl_r_in_flight++;
1477 cli->cl_w_in_flight++;
1478 client_obd_list_unlock(&cli->cl_loi_list_lock);
1479 } else if (cmd == OBD_BRW_WRITE) {
1480 client_obd_list_lock(&cli->cl_loi_list_lock);
1481 for (i = 0; i < page_count; i++)
1482 osc_release_write_grant(cli, pga[i], 0);
1483 client_obd_list_unlock(&cli->cl_loi_list_lock);
1489 * ugh, we want disk allocation on the target to happen in offset order. we'll
1490 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1491 * fine for our small page arrays and doesn't require allocation. its an
1492 * insertion sort that swaps elements that are strides apart, shrinking the
1493 * stride down until its '1' and the array is sorted.
1495 static void sort_brw_pages(struct brw_page **array, int num)
1498 struct brw_page *tmp;
1502 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1507 for (i = stride ; i < num ; i++) {
1510 while (j >= stride && array[j - stride]->off > tmp->off) {
1511 array[j] = array[j - stride];
1516 } while (stride > 1);
1519 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1525 LASSERT (pages > 0);
1526 offset = pg[i]->off & ~CFS_PAGE_MASK;
1530 if (pages == 0) /* that's all */
1533 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1534 return count; /* doesn't end on page boundary */
1537 offset = pg[i]->off & ~CFS_PAGE_MASK;
1538 if (offset != 0) /* doesn't start on page boundary */
1545 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1547 struct brw_page **ppga;
1550 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1554 for (i = 0; i < count; i++)
1559 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1561 LASSERT(ppga != NULL);
1562 OBD_FREE(ppga, sizeof(*ppga) * count);
1565 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1566 obd_count page_count, struct brw_page *pga,
1567 struct obd_trans_info *oti)
1569 struct obdo *saved_oa = NULL;
1570 struct brw_page **ppga, **orig;
1571 struct obd_import *imp = class_exp2cliimp(exp);
1572 struct client_obd *cli = &imp->imp_obd->u.cli;
1573 int rc, page_count_orig;
1576 if (cmd & OBD_BRW_CHECK) {
1577 /* The caller just wants to know if there's a chance that this
1578 * I/O can succeed */
1580 if (imp == NULL || imp->imp_invalid)
1585 /* test_brw with a failed create can trip this, maybe others. */
1586 LASSERT(cli->cl_max_pages_per_rpc);
1590 orig = ppga = osc_build_ppga(pga, page_count);
1593 page_count_orig = page_count;
1595 sort_brw_pages(ppga, page_count);
1596 while (page_count) {
1597 obd_count pages_per_brw;
1599 if (page_count > cli->cl_max_pages_per_rpc)
1600 pages_per_brw = cli->cl_max_pages_per_rpc;
1602 pages_per_brw = page_count;
1604 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1606 if (saved_oa != NULL) {
1607 /* restore previously saved oa */
1608 *oinfo->oi_oa = *saved_oa;
1609 } else if (page_count > pages_per_brw) {
1610 /* save a copy of oa (brw will clobber it) */
1611 OBDO_ALLOC(saved_oa);
1612 if (saved_oa == NULL)
1613 GOTO(out, rc = -ENOMEM);
1614 *saved_oa = *oinfo->oi_oa;
1617 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1618 pages_per_brw, ppga, oinfo->oi_capa);
1623 page_count -= pages_per_brw;
1624 ppga += pages_per_brw;
1628 osc_release_ppga(orig, page_count_orig);
1630 if (saved_oa != NULL)
1631 OBDO_FREE(saved_oa);
1636 static int osc_brw_async(int cmd, struct obd_export *exp,
1637 struct obd_info *oinfo, obd_count page_count,
1638 struct brw_page *pga, struct obd_trans_info *oti,
1639 struct ptlrpc_request_set *set)
1641 struct brw_page **ppga, **orig;
1642 struct client_obd *cli = &exp->exp_obd->u.cli;
1643 int page_count_orig;
1647 if (cmd & OBD_BRW_CHECK) {
1648 struct obd_import *imp = class_exp2cliimp(exp);
1649 /* The caller just wants to know if there's a chance that this
1650 * I/O can succeed */
1652 if (imp == NULL || imp->imp_invalid)
1657 orig = ppga = osc_build_ppga(pga, page_count);
1660 page_count_orig = page_count;
1662 sort_brw_pages(ppga, page_count);
1663 while (page_count) {
1664 struct brw_page **copy;
1665 obd_count pages_per_brw;
1667 pages_per_brw = min_t(obd_count, page_count,
1668 cli->cl_max_pages_per_rpc);
1670 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1672 /* use ppga only if single RPC is going to fly */
1673 if (pages_per_brw != page_count_orig || ppga != orig) {
1674 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1676 GOTO(out, rc = -ENOMEM);
1677 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1681 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1682 pages_per_brw, copy, set, oinfo->oi_capa);
1686 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1690 /* we passed it to async_internal() which is
1691 * now responsible for releasing memory */
1695 page_count -= pages_per_brw;
1696 ppga += pages_per_brw;
1700 osc_release_ppga(orig, page_count_orig);
1704 static void osc_check_rpcs(struct client_obd *cli);
1706 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1707 * the dirty accounting. Writeback completes or truncate happens before
1708 * writing starts. Must be called with the loi lock held. */
1709 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1712 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1716 /* This maintains the lists of pending pages to read/write for a given object
1717 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1718 * to quickly find objects that are ready to send an RPC. */
1719 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1725 if (lop->lop_num_pending == 0)
1728 /* if we have an invalid import we want to drain the queued pages
1729 * by forcing them through rpcs that immediately fail and complete
1730 * the pages. recovery relies on this to empty the queued pages
1731 * before canceling the locks and evicting down the llite pages */
1732 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1735 /* stream rpcs in queue order as long as as there is an urgent page
1736 * queued. this is our cheap solution for good batching in the case
1737 * where writepage marks some random page in the middle of the file
1738 * as urgent because of, say, memory pressure */
1739 if (!list_empty(&lop->lop_urgent)) {
1740 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1743 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1744 optimal = cli->cl_max_pages_per_rpc;
1745 if (cmd & OBD_BRW_WRITE) {
1746 /* trigger a write rpc stream as long as there are dirtiers
1747 * waiting for space. as they're waiting, they're not going to
1748 * create more pages to coallesce with what's waiting.. */
1749 if (!list_empty(&cli->cl_cache_waiters)) {
1750 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1753 /* +16 to avoid triggering rpcs that would want to include pages
1754 * that are being queued but which can't be made ready until
1755 * the queuer finishes with the page. this is a wart for
1756 * llite::commit_write() */
1759 if (lop->lop_num_pending >= optimal)
1765 static void on_list(struct list_head *item, struct list_head *list,
1768 if (list_empty(item) && should_be_on)
1769 list_add_tail(item, list);
1770 else if (!list_empty(item) && !should_be_on)
1771 list_del_init(item);
1774 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1775 * can find pages to build into rpcs quickly */
1776 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1778 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1779 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1780 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1782 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1783 loi->loi_write_lop.lop_num_pending);
1785 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1786 loi->loi_read_lop.lop_num_pending);
1789 static void lop_update_pending(struct client_obd *cli,
1790 struct loi_oap_pages *lop, int cmd, int delta)
1792 lop->lop_num_pending += delta;
1793 if (cmd & OBD_BRW_WRITE)
1794 cli->cl_pending_w_pages += delta;
1796 cli->cl_pending_r_pages += delta;
1799 /* this is called when a sync waiter receives an interruption. Its job is to
1800 * get the caller woken as soon as possible. If its page hasn't been put in an
1801 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1802 * desiring interruption which will forcefully complete the rpc once the rpc
1804 static void osc_occ_interrupted(struct oig_callback_context *occ)
1806 struct osc_async_page *oap;
1807 struct loi_oap_pages *lop;
1808 struct lov_oinfo *loi;
1811 /* XXX member_of() */
1812 oap = list_entry(occ, struct osc_async_page, oap_occ);
1814 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1816 oap->oap_interrupted = 1;
1818 /* ok, it's been put in an rpc. only one oap gets a request reference */
1819 if (oap->oap_request != NULL) {
1820 ptlrpc_mark_interrupted(oap->oap_request);
1821 ptlrpcd_wake(oap->oap_request);
1825 /* we don't get interruption callbacks until osc_trigger_group_io()
1826 * has been called and put the sync oaps in the pending/urgent lists.*/
1827 if (!list_empty(&oap->oap_pending_item)) {
1828 list_del_init(&oap->oap_pending_item);
1829 list_del_init(&oap->oap_urgent_item);
1832 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1833 &loi->loi_write_lop : &loi->loi_read_lop;
1834 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1835 loi_list_maint(oap->oap_cli, oap->oap_loi);
1837 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1838 oap->oap_oig = NULL;
1842 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1845 /* this is trying to propogate async writeback errors back up to the
1846 * application. As an async write fails we record the error code for later if
1847 * the app does an fsync. As long as errors persist we force future rpcs to be
1848 * sync so that the app can get a sync error and break the cycle of queueing
1849 * pages for which writeback will fail. */
1850 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1857 ar->ar_force_sync = 1;
1858 ar->ar_min_xid = ptlrpc_sample_next_xid();
1863 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1864 ar->ar_force_sync = 0;
1867 static void osc_oap_to_pending(struct osc_async_page *oap)
1869 struct loi_oap_pages *lop;
1871 if (oap->oap_cmd & OBD_BRW_WRITE)
1872 lop = &oap->oap_loi->loi_write_lop;
1874 lop = &oap->oap_loi->loi_read_lop;
1876 if (oap->oap_async_flags & ASYNC_URGENT)
1877 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1878 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1879 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1882 /* this must be called holding the loi list lock to give coverage to exit_cache,
1883 * async_flag maintenance, and oap_request */
1884 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1885 struct osc_async_page *oap, int sent, int rc)
1890 if (oap->oap_request != NULL) {
1891 xid = ptlrpc_req_xid(oap->oap_request);
1892 ptlrpc_req_finished(oap->oap_request);
1893 oap->oap_request = NULL;
1896 oap->oap_async_flags = 0;
1897 oap->oap_interrupted = 0;
1899 if (oap->oap_cmd & OBD_BRW_WRITE) {
1900 osc_process_ar(&cli->cl_ar, xid, rc);
1901 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1904 if (rc == 0 && oa != NULL) {
1905 if (oa->o_valid & OBD_MD_FLBLOCKS)
1906 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1907 if (oa->o_valid & OBD_MD_FLMTIME)
1908 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1909 if (oa->o_valid & OBD_MD_FLATIME)
1910 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1911 if (oa->o_valid & OBD_MD_FLCTIME)
1912 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1916 osc_exit_cache(cli, oap, sent);
1917 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1918 oap->oap_oig = NULL;
1923 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1924 oap->oap_cmd, oa, rc);
1926 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1927 * I/O on the page could start, but OSC calls it under lock
1928 * and thus we can add oap back to pending safely */
1930 /* upper layer wants to leave the page on pending queue */
1931 osc_oap_to_pending(oap);
1933 osc_exit_cache(cli, oap, sent);
1937 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1939 struct osc_async_page *oap, *tmp;
1940 struct osc_brw_async_args *aa = data;
1941 struct client_obd *cli;
1944 rc = osc_brw_fini_request(req, rc);
1945 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1946 if (osc_recoverable_error(rc)) {
1947 rc = osc_brw_redo_request(req, aa);
1954 client_obd_list_lock(&cli->cl_loi_list_lock);
1956 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1957 * is called so we know whether to go to sync BRWs or wait for more
1958 * RPCs to complete */
1959 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1960 cli->cl_w_in_flight--;
1962 cli->cl_r_in_flight--;
1964 /* the caller may re-use the oap after the completion call so
1965 * we need to clean it up a little */
1966 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1967 list_del_init(&oap->oap_rpc_item);
1968 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1971 osc_wake_cache_waiters(cli);
1972 osc_check_rpcs(cli);
1974 client_obd_list_unlock(&cli->cl_loi_list_lock);
1976 OBDO_FREE(aa->aa_oa);
1978 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1982 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1983 struct list_head *rpc_list,
1984 int page_count, int cmd)
1986 struct ptlrpc_request *req;
1987 struct brw_page **pga = NULL;
1988 struct osc_brw_async_args *aa;
1989 struct obdo *oa = NULL;
1990 struct obd_async_page_ops *ops = NULL;
1991 void *caller_data = NULL;
1992 struct obd_capa *ocapa;
1993 struct osc_async_page *oap;
1997 LASSERT(!list_empty(rpc_list));
1999 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2001 RETURN(ERR_PTR(-ENOMEM));
2005 GOTO(out, req = ERR_PTR(-ENOMEM));
2008 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2010 ops = oap->oap_caller_ops;
2011 caller_data = oap->oap_caller_data;
2013 pga[i] = &oap->oap_brw_page;
2014 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2015 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2016 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2020 /* always get the data for the obdo for the rpc */
2021 LASSERT(ops != NULL);
2022 ops->ap_fill_obdo(caller_data, cmd, oa);
2023 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2025 sort_brw_pages(pga, page_count);
2026 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2030 CERROR("prep_req failed: %d\n", rc);
2031 GOTO(out, req = ERR_PTR(rc));
2034 /* Need to update the timestamps after the request is built in case
2035 * we race with setattr (locally or in queue at OST). If OST gets
2036 * later setattr before earlier BRW (as determined by the request xid),
2037 * the OST will not use BRW timestamps. Sadly, there is no obvious
2038 * way to do this in a single call. bug 10150 */
2039 ops->ap_update_obdo(caller_data, cmd, oa,
2040 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2042 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2043 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2044 INIT_LIST_HEAD(&aa->aa_oaps);
2045 list_splice(rpc_list, &aa->aa_oaps);
2046 INIT_LIST_HEAD(rpc_list);
2053 OBD_FREE(pga, sizeof(*pga) * page_count);
2058 /* the loi lock is held across this function but it's allowed to release
2059 * and reacquire it during its work */
2060 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2061 int cmd, struct loi_oap_pages *lop)
2063 struct ptlrpc_request *req;
2064 obd_count page_count = 0;
2065 struct osc_async_page *oap = NULL, *tmp;
2066 struct osc_brw_async_args *aa;
2067 struct obd_async_page_ops *ops;
2068 CFS_LIST_HEAD(rpc_list);
2069 unsigned int ending_offset;
2070 unsigned starting_offset = 0;
2073 /* first we find the pages we're allowed to work with */
2074 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2076 ops = oap->oap_caller_ops;
2078 LASSERT(oap->oap_magic == OAP_MAGIC);
2080 /* in llite being 'ready' equates to the page being locked
2081 * until completion unlocks it. commit_write submits a page
2082 * as not ready because its unlock will happen unconditionally
2083 * as the call returns. if we race with commit_write giving
2084 * us that page we dont' want to create a hole in the page
2085 * stream, so we stop and leave the rpc to be fired by
2086 * another dirtier or kupdated interval (the not ready page
2087 * will still be on the dirty list). we could call in
2088 * at the end of ll_file_write to process the queue again. */
2089 if (!(oap->oap_async_flags & ASYNC_READY)) {
2090 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2092 CDEBUG(D_INODE, "oap %p page %p returned %d "
2093 "instead of ready\n", oap,
2097 /* llite is telling us that the page is still
2098 * in commit_write and that we should try
2099 * and put it in an rpc again later. we
2100 * break out of the loop so we don't create
2101 * a hole in the sequence of pages in the rpc
2106 /* the io isn't needed.. tell the checks
2107 * below to complete the rpc with EINTR */
2108 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2109 oap->oap_count = -EINTR;
2112 oap->oap_async_flags |= ASYNC_READY;
2115 LASSERTF(0, "oap %p page %p returned %d "
2116 "from make_ready\n", oap,
2124 * Page submitted for IO has to be locked. Either by
2125 * ->ap_make_ready() or by higher layers.
2127 * XXX nikita: this assertion should be adjusted when lustre
2128 * starts using PG_writeback for pages being written out.
2130 #if defined(__KERNEL__) && defined(__linux__)
2131 LASSERT(PageLocked(oap->oap_page));
2133 /* If there is a gap at the start of this page, it can't merge
2134 * with any previous page, so we'll hand the network a
2135 * "fragmented" page array that it can't transfer in 1 RDMA */
2136 if (page_count != 0 && oap->oap_page_off != 0)
2139 /* take the page out of our book-keeping */
2140 list_del_init(&oap->oap_pending_item);
2141 lop_update_pending(cli, lop, cmd, -1);
2142 list_del_init(&oap->oap_urgent_item);
2144 if (page_count == 0)
2145 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2146 (PTLRPC_MAX_BRW_SIZE - 1);
2148 /* ask the caller for the size of the io as the rpc leaves. */
2149 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2151 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2152 if (oap->oap_count <= 0) {
2153 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2155 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2159 /* now put the page back in our accounting */
2160 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2161 if (++page_count >= cli->cl_max_pages_per_rpc)
2164 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2165 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2166 * have the same alignment as the initial writes that allocated
2167 * extents on the server. */
2168 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2169 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2170 if (ending_offset == 0)
2173 /* If there is a gap at the end of this page, it can't merge
2174 * with any subsequent pages, so we'll hand the network a
2175 * "fragmented" page array that it can't transfer in 1 RDMA */
2176 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2180 osc_wake_cache_waiters(cli);
2182 if (page_count == 0)
2185 loi_list_maint(cli, loi);
2187 client_obd_list_unlock(&cli->cl_loi_list_lock);
2189 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2191 /* this should happen rarely and is pretty bad, it makes the
2192 * pending list not follow the dirty order */
2193 client_obd_list_lock(&cli->cl_loi_list_lock);
2194 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2195 list_del_init(&oap->oap_rpc_item);
2197 /* queued sync pages can be torn down while the pages
2198 * were between the pending list and the rpc */
2199 if (oap->oap_interrupted) {
2200 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2201 osc_ap_completion(cli, NULL, oap, 0,
2205 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2207 loi_list_maint(cli, loi);
2208 RETURN(PTR_ERR(req));
2211 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2213 if (cmd == OBD_BRW_READ) {
2214 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2215 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2216 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2217 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2218 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2220 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2221 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2222 cli->cl_w_in_flight);
2223 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2224 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2225 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2228 client_obd_list_lock(&cli->cl_loi_list_lock);
2230 if (cmd == OBD_BRW_READ)
2231 cli->cl_r_in_flight++;
2233 cli->cl_w_in_flight++;
2235 /* queued sync pages can be torn down while the pages
2236 * were between the pending list and the rpc */
2238 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2239 /* only one oap gets a request reference */
2242 if (oap->oap_interrupted && !req->rq_intr) {
2243 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2245 ptlrpc_mark_interrupted(req);
2249 tmp->oap_request = ptlrpc_request_addref(req);
2251 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2252 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2254 req->rq_interpret_reply = brw_interpret_oap;
2255 ptlrpcd_add_req(req);
2259 #define LOI_DEBUG(LOI, STR, args...) \
2260 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2261 !list_empty(&(LOI)->loi_cli_item), \
2262 (LOI)->loi_write_lop.lop_num_pending, \
2263 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2264 (LOI)->loi_read_lop.lop_num_pending, \
2265 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2268 /* This is called by osc_check_rpcs() to find which objects have pages that
2269 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2270 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2273 /* first return all objects which we already know to have
2274 * pages ready to be stuffed into rpcs */
2275 if (!list_empty(&cli->cl_loi_ready_list))
2276 RETURN(list_entry(cli->cl_loi_ready_list.next,
2277 struct lov_oinfo, loi_cli_item));
2279 /* then if we have cache waiters, return all objects with queued
2280 * writes. This is especially important when many small files
2281 * have filled up the cache and not been fired into rpcs because
2282 * they don't pass the nr_pending/object threshhold */
2283 if (!list_empty(&cli->cl_cache_waiters) &&
2284 !list_empty(&cli->cl_loi_write_list))
2285 RETURN(list_entry(cli->cl_loi_write_list.next,
2286 struct lov_oinfo, loi_write_item));
2288 /* then return all queued objects when we have an invalid import
2289 * so that they get flushed */
2290 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2291 if (!list_empty(&cli->cl_loi_write_list))
2292 RETURN(list_entry(cli->cl_loi_write_list.next,
2293 struct lov_oinfo, loi_write_item));
2294 if (!list_empty(&cli->cl_loi_read_list))
2295 RETURN(list_entry(cli->cl_loi_read_list.next,
2296 struct lov_oinfo, loi_read_item));
2301 /* called with the loi list lock held */
2302 static void osc_check_rpcs(struct client_obd *cli)
2304 struct lov_oinfo *loi;
2305 int rc = 0, race_counter = 0;
2308 while ((loi = osc_next_loi(cli)) != NULL) {
2309 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2311 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2314 /* attempt some read/write balancing by alternating between
2315 * reads and writes in an object. The makes_rpc checks here
2316 * would be redundant if we were getting read/write work items
2317 * instead of objects. we don't want send_oap_rpc to drain a
2318 * partial read pending queue when we're given this object to
2319 * do io on writes while there are cache waiters */
2320 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2321 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2322 &loi->loi_write_lop);
2330 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2331 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2332 &loi->loi_read_lop);
2341 /* attempt some inter-object balancing by issueing rpcs
2342 * for each object in turn */
2343 if (!list_empty(&loi->loi_cli_item))
2344 list_del_init(&loi->loi_cli_item);
2345 if (!list_empty(&loi->loi_write_item))
2346 list_del_init(&loi->loi_write_item);
2347 if (!list_empty(&loi->loi_read_item))
2348 list_del_init(&loi->loi_read_item);
2350 loi_list_maint(cli, loi);
2352 /* send_oap_rpc fails with 0 when make_ready tells it to
2353 * back off. llite's make_ready does this when it tries
2354 * to lock a page queued for write that is already locked.
2355 * we want to try sending rpcs from many objects, but we
2356 * don't want to spin failing with 0. */
2357 if (race_counter == 10)
2363 /* we're trying to queue a page in the osc so we're subject to the
2364 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2365 * If the osc's queued pages are already at that limit, then we want to sleep
2366 * until there is space in the osc's queue for us. We also may be waiting for
2367 * write credits from the OST if there are RPCs in flight that may return some
2368 * before we fall back to sync writes.
2370 * We need this know our allocation was granted in the presence of signals */
2371 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2375 client_obd_list_lock(&cli->cl_loi_list_lock);
2376 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2377 client_obd_list_unlock(&cli->cl_loi_list_lock);
2381 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2382 * grant or cache space. */
2383 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2384 struct osc_async_page *oap)
2386 struct osc_cache_waiter ocw;
2387 struct l_wait_info lwi = { 0 };
2391 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2392 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2393 cli->cl_dirty_max, obd_max_dirty_pages,
2394 cli->cl_lost_grant, cli->cl_avail_grant);
2396 /* force the caller to try sync io. this can jump the list
2397 * of queued writes and create a discontiguous rpc stream */
2398 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2399 loi->loi_ar.ar_force_sync)
2402 /* Hopefully normal case - cache space and write credits available */
2403 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2404 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2405 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2406 /* account for ourselves */
2407 osc_consume_write_grant(cli, &oap->oap_brw_page);
2411 /* Make sure that there are write rpcs in flight to wait for. This
2412 * is a little silly as this object may not have any pending but
2413 * other objects sure might. */
2414 if (cli->cl_w_in_flight) {
2415 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2416 cfs_waitq_init(&ocw.ocw_waitq);
2420 loi_list_maint(cli, loi);
2421 osc_check_rpcs(cli);
2422 client_obd_list_unlock(&cli->cl_loi_list_lock);
2424 CDEBUG(D_CACHE, "sleeping for cache space\n");
2425 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2427 client_obd_list_lock(&cli->cl_loi_list_lock);
2428 if (!list_empty(&ocw.ocw_entry)) {
2429 list_del(&ocw.ocw_entry);
2438 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2439 struct lov_oinfo *loi, cfs_page_t *page,
2440 obd_off offset, struct obd_async_page_ops *ops,
2441 void *data, void **res)
2443 struct osc_async_page *oap;
2447 return size_round(sizeof(*oap));
2450 oap->oap_magic = OAP_MAGIC;
2451 oap->oap_cli = &exp->exp_obd->u.cli;
2454 oap->oap_caller_ops = ops;
2455 oap->oap_caller_data = data;
2457 oap->oap_page = page;
2458 oap->oap_obj_off = offset;
2460 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2461 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2462 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2464 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2466 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2470 struct osc_async_page *oap_from_cookie(void *cookie)
2472 struct osc_async_page *oap = cookie;
2473 if (oap->oap_magic != OAP_MAGIC)
2474 return ERR_PTR(-EINVAL);
2478 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2479 struct lov_oinfo *loi, void *cookie,
2480 int cmd, obd_off off, int count,
2481 obd_flag brw_flags, enum async_flags async_flags)
2483 struct client_obd *cli = &exp->exp_obd->u.cli;
2484 struct osc_async_page *oap;
2488 oap = oap_from_cookie(cookie);
2490 RETURN(PTR_ERR(oap));
2492 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2495 if (!list_empty(&oap->oap_pending_item) ||
2496 !list_empty(&oap->oap_urgent_item) ||
2497 !list_empty(&oap->oap_rpc_item))
2500 /* check if the file's owner/group is over quota */
2501 #ifdef HAVE_QUOTA_SUPPORT
2502 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2503 struct obd_async_page_ops *ops;
2510 ops = oap->oap_caller_ops;
2511 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2512 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2523 loi = lsm->lsm_oinfo[0];
2525 client_obd_list_lock(&cli->cl_loi_list_lock);
2528 oap->oap_page_off = off;
2529 oap->oap_count = count;
2530 oap->oap_brw_flags = brw_flags;
2531 oap->oap_async_flags = async_flags;
2533 if (cmd & OBD_BRW_WRITE) {
2534 rc = osc_enter_cache(cli, loi, oap);
2536 client_obd_list_unlock(&cli->cl_loi_list_lock);
2541 osc_oap_to_pending(oap);
2542 loi_list_maint(cli, loi);
2544 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2547 osc_check_rpcs(cli);
2548 client_obd_list_unlock(&cli->cl_loi_list_lock);
2553 /* aka (~was & now & flag), but this is more clear :) */
2554 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2556 static int osc_set_async_flags(struct obd_export *exp,
2557 struct lov_stripe_md *lsm,
2558 struct lov_oinfo *loi, void *cookie,
2559 obd_flag async_flags)
2561 struct client_obd *cli = &exp->exp_obd->u.cli;
2562 struct loi_oap_pages *lop;
2563 struct osc_async_page *oap;
2567 oap = oap_from_cookie(cookie);
2569 RETURN(PTR_ERR(oap));
2572 * bug 7311: OST-side locking is only supported for liblustre for now
2573 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2574 * implementation has to handle case where OST-locked page was picked
2575 * up by, e.g., ->writepage().
2577 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2578 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2581 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2585 loi = lsm->lsm_oinfo[0];
2587 if (oap->oap_cmd & OBD_BRW_WRITE) {
2588 lop = &loi->loi_write_lop;
2590 lop = &loi->loi_read_lop;
2593 client_obd_list_lock(&cli->cl_loi_list_lock);
2595 if (list_empty(&oap->oap_pending_item))
2596 GOTO(out, rc = -EINVAL);
2598 if ((oap->oap_async_flags & async_flags) == async_flags)
2601 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2602 oap->oap_async_flags |= ASYNC_READY;
2604 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2605 if (list_empty(&oap->oap_rpc_item)) {
2606 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2607 loi_list_maint(cli, loi);
2611 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2612 oap->oap_async_flags);
2614 osc_check_rpcs(cli);
2615 client_obd_list_unlock(&cli->cl_loi_list_lock);
2619 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2620 struct lov_oinfo *loi,
2621 struct obd_io_group *oig, void *cookie,
2622 int cmd, obd_off off, int count,
2624 obd_flag async_flags)
2626 struct client_obd *cli = &exp->exp_obd->u.cli;
2627 struct osc_async_page *oap;
2628 struct loi_oap_pages *lop;
2632 oap = oap_from_cookie(cookie);
2634 RETURN(PTR_ERR(oap));
2636 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2639 if (!list_empty(&oap->oap_pending_item) ||
2640 !list_empty(&oap->oap_urgent_item) ||
2641 !list_empty(&oap->oap_rpc_item))
2645 loi = lsm->lsm_oinfo[0];
2647 client_obd_list_lock(&cli->cl_loi_list_lock);
2650 oap->oap_page_off = off;
2651 oap->oap_count = count;
2652 oap->oap_brw_flags = brw_flags;
2653 oap->oap_async_flags = async_flags;
2655 if (cmd & OBD_BRW_WRITE)
2656 lop = &loi->loi_write_lop;
2658 lop = &loi->loi_read_lop;
2660 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2661 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2663 rc = oig_add_one(oig, &oap->oap_occ);
2666 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2667 oap, oap->oap_page, rc);
2669 client_obd_list_unlock(&cli->cl_loi_list_lock);
2674 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2675 struct loi_oap_pages *lop, int cmd)
2677 struct list_head *pos, *tmp;
2678 struct osc_async_page *oap;
2680 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2681 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2682 list_del(&oap->oap_pending_item);
2683 osc_oap_to_pending(oap);
2685 loi_list_maint(cli, loi);
2688 static int osc_trigger_group_io(struct obd_export *exp,
2689 struct lov_stripe_md *lsm,
2690 struct lov_oinfo *loi,
2691 struct obd_io_group *oig)
2693 struct client_obd *cli = &exp->exp_obd->u.cli;
2697 loi = lsm->lsm_oinfo[0];
2699 client_obd_list_lock(&cli->cl_loi_list_lock);
2701 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2702 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2704 osc_check_rpcs(cli);
2705 client_obd_list_unlock(&cli->cl_loi_list_lock);
2710 static int osc_teardown_async_page(struct obd_export *exp,
2711 struct lov_stripe_md *lsm,
2712 struct lov_oinfo *loi, void *cookie)
2714 struct client_obd *cli = &exp->exp_obd->u.cli;
2715 struct loi_oap_pages *lop;
2716 struct osc_async_page *oap;
2720 oap = oap_from_cookie(cookie);
2722 RETURN(PTR_ERR(oap));
2725 loi = lsm->lsm_oinfo[0];
2727 if (oap->oap_cmd & OBD_BRW_WRITE) {
2728 lop = &loi->loi_write_lop;
2730 lop = &loi->loi_read_lop;
2733 client_obd_list_lock(&cli->cl_loi_list_lock);
2735 if (!list_empty(&oap->oap_rpc_item))
2736 GOTO(out, rc = -EBUSY);
2738 osc_exit_cache(cli, oap, 0);
2739 osc_wake_cache_waiters(cli);
2741 if (!list_empty(&oap->oap_urgent_item)) {
2742 list_del_init(&oap->oap_urgent_item);
2743 oap->oap_async_flags &= ~ASYNC_URGENT;
2745 if (!list_empty(&oap->oap_pending_item)) {
2746 list_del_init(&oap->oap_pending_item);
2747 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2749 loi_list_maint(cli, loi);
2751 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2753 client_obd_list_unlock(&cli->cl_loi_list_lock);
2757 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2760 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2763 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2766 lock_res_and_lock(lock);
2767 #if defined (__KERNEL__) && defined (__linux__)
2768 /* Liang XXX: Darwin and Winnt checking should be added */
2769 if (lock->l_ast_data && lock->l_ast_data != data) {
2770 struct inode *new_inode = data;
2771 struct inode *old_inode = lock->l_ast_data;
2772 if (!(old_inode->i_state & I_FREEING))
2773 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2774 LASSERTF(old_inode->i_state & I_FREEING,
2775 "Found existing inode %p/%lu/%u state %lu in lock: "
2776 "setting data to %p/%lu/%u\n", old_inode,
2777 old_inode->i_ino, old_inode->i_generation,
2779 new_inode, new_inode->i_ino, new_inode->i_generation);
2782 lock->l_ast_data = data;
2783 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2784 unlock_res_and_lock(lock);
2785 LDLM_LOCK_PUT(lock);
2788 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2789 ldlm_iterator_t replace, void *data)
2791 struct ldlm_res_id res_id = { .name = {0} };
2792 struct obd_device *obd = class_exp2obd(exp);
2794 res_id.name[0] = lsm->lsm_object_id;
2795 res_id.name[2] = lsm->lsm_object_gr;
2797 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2801 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2807 /* The request was created before ldlm_cli_enqueue call. */
2808 if (rc == ELDLM_LOCK_ABORTED) {
2809 struct ldlm_reply *rep;
2811 /* swabbed by ldlm_cli_enqueue() */
2812 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2813 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2815 LASSERT(rep != NULL);
2816 if (rep->lock_policy_res1)
2817 rc = rep->lock_policy_res1;
2821 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2822 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2823 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2824 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2825 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2828 /* Call the update callback. */
2829 rc = oinfo->oi_cb_up(oinfo, rc);
2833 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2834 struct osc_enqueue_args *aa, int rc)
2836 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2837 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2838 struct ldlm_lock *lock;
2840 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2842 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2844 /* Complete obtaining the lock procedure. */
2845 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2847 &aa->oa_oi->oi_flags,
2848 &lsm->lsm_oinfo[0]->loi_lvb,
2849 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2850 lustre_swab_ost_lvb,
2851 aa->oa_oi->oi_lockh, rc);
2853 /* Complete osc stuff. */
2854 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2856 /* Release the lock for async request. */
2857 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2858 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2860 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2861 aa->oa_oi->oi_lockh, req, aa);
2862 LDLM_LOCK_PUT(lock);
2866 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2867 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2868 * other synchronous requests, however keeping some locks and trying to obtain
2869 * others may take a considerable amount of time in a case of ost failure; and
2870 * when other sync requests do not get released lock from a client, the client
2871 * is excluded from the cluster -- such scenarious make the life difficult, so
2872 * release locks just after they are obtained. */
2873 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2874 struct ldlm_enqueue_info *einfo,
2875 struct ptlrpc_request_set *rqset)
2877 struct ldlm_res_id res_id = { .name = {0} };
2878 struct obd_device *obd = exp->exp_obd;
2879 struct ldlm_reply *rep;
2880 struct ptlrpc_request *req = NULL;
2881 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2886 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2887 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2889 /* Filesystem lock extents are extended to page boundaries so that
2890 * dealing with the page cache is a little smoother. */
2891 oinfo->oi_policy.l_extent.start -=
2892 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2893 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2895 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2898 /* Next, search for already existing extent locks that will cover us */
2899 /* If we're trying to read, we also search for an existing PW lock. The
2900 * VFS and page cache already protect us locally, so lots of readers/
2901 * writers can share a single PW lock.
2903 * There are problems with conversion deadlocks, so instead of
2904 * converting a read lock to a write lock, we'll just enqueue a new
2907 * At some point we should cancel the read lock instead of making them
2908 * send us a blocking callback, but there are problems with canceling
2909 * locks out from other users right now, too. */
2910 mode = einfo->ei_mode;
2911 if (einfo->ei_mode == LCK_PR)
2913 mode = ldlm_lock_match(obd->obd_namespace,
2914 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2915 einfo->ei_type, &oinfo->oi_policy, mode,
2918 /* addref the lock only if not async requests and PW lock is
2919 * matched whereas we asked for PR. */
2920 if (!rqset && einfo->ei_mode != mode)
2921 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2922 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2925 /* I would like to be able to ASSERT here that rss <=
2926 * kms, but I can't, for reasons which are explained in
2930 /* We already have a lock, and it's referenced */
2931 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2933 /* For async requests, decref the lock. */
2934 if (einfo->ei_mode != mode)
2935 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2937 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2945 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2946 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2947 [DLM_LOCKREQ_OFF + 1] = 0 };
2949 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2953 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2954 size[DLM_REPLY_REC_OFF] =
2955 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2956 ptlrpc_req_set_repsize(req, 3, size);
2959 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2960 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2962 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2963 &oinfo->oi_policy, &oinfo->oi_flags,
2964 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2965 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2966 lustre_swab_ost_lvb, oinfo->oi_lockh,
2970 struct osc_enqueue_args *aa;
2971 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2972 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2977 req->rq_interpret_reply = osc_enqueue_interpret;
2978 ptlrpc_set_add_req(rqset, req);
2979 } else if (intent) {
2980 ptlrpc_req_finished(req);
2985 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2987 ptlrpc_req_finished(req);
2992 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2993 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2994 int *flags, void *data, struct lustre_handle *lockh)
2996 struct ldlm_res_id res_id = { .name = {0} };
2997 struct obd_device *obd = exp->exp_obd;
2998 int lflags = *flags;
3002 res_id.name[0] = lsm->lsm_object_id;
3003 res_id.name[2] = lsm->lsm_object_gr;
3005 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3008 /* Filesystem lock extents are extended to page boundaries so that
3009 * dealing with the page cache is a little smoother */
3010 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3011 policy->l_extent.end |= ~CFS_PAGE_MASK;
3013 /* Next, search for already existing extent locks that will cover us */
3014 /* If we're trying to read, we also search for an existing PW lock. The
3015 * VFS and page cache already protect us locally, so lots of readers/
3016 * writers can share a single PW lock. */
3020 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3021 &res_id, type, policy, rc, lockh);
3023 osc_set_data_with_check(lockh, data, lflags);
3024 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3025 ldlm_lock_addref(lockh, LCK_PR);
3026 ldlm_lock_decref(lockh, LCK_PW);
3033 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3034 __u32 mode, struct lustre_handle *lockh)
3038 if (unlikely(mode == LCK_GROUP))
3039 ldlm_lock_decref_and_cancel(lockh, mode);
3041 ldlm_lock_decref(lockh, mode);
3046 static int osc_cancel_unused(struct obd_export *exp,
3047 struct lov_stripe_md *lsm, int flags,
3050 struct obd_device *obd = class_exp2obd(exp);
3051 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3054 res_id.name[0] = lsm->lsm_object_id;
3055 res_id.name[2] = lsm->lsm_object_gr;
3059 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3062 static int osc_join_lru(struct obd_export *exp,
3063 struct lov_stripe_md *lsm, int join)
3065 struct obd_device *obd = class_exp2obd(exp);
3066 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3069 res_id.name[0] = lsm->lsm_object_id;
3070 res_id.name[2] = lsm->lsm_object_gr;
3074 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3077 static int osc_statfs_interpret(struct ptlrpc_request *req,
3078 struct osc_async_args *aa, int rc)
3080 struct obd_statfs *msfs;
3086 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3087 lustre_swab_obd_statfs);
3089 CERROR("Can't unpack obd_statfs\n");
3090 GOTO(out, rc = -EPROTO);
3093 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3095 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3099 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3100 __u64 max_age, struct ptlrpc_request_set *rqset)
3102 struct ptlrpc_request *req;
3103 struct osc_async_args *aa;
3104 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3107 /* We could possibly pass max_age in the request (as an absolute
3108 * timestamp or a "seconds.usec ago") so the target can avoid doing
3109 * extra calls into the filesystem if that isn't necessary (e.g.
3110 * during mount that would help a bit). Having relative timestamps
3111 * is not so great if request processing is slow, while absolute
3112 * timestamps are not ideal because they need time synchronization. */
3113 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3114 OST_STATFS, 1, NULL, NULL);
3118 ptlrpc_req_set_repsize(req, 2, size);
3119 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3121 req->rq_interpret_reply = osc_statfs_interpret;
3122 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3123 aa = (struct osc_async_args *)&req->rq_async_args;
3126 ptlrpc_set_add_req(rqset, req);
3130 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3133 struct obd_statfs *msfs;
3134 struct ptlrpc_request *req;
3135 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3138 /* We could possibly pass max_age in the request (as an absolute
3139 * timestamp or a "seconds.usec ago") so the target can avoid doing
3140 * extra calls into the filesystem if that isn't necessary (e.g.
3141 * during mount that would help a bit). Having relative timestamps
3142 * is not so great if request processing is slow, while absolute
3143 * timestamps are not ideal because they need time synchronization. */
3144 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3145 OST_STATFS, 1, NULL, NULL);
3149 ptlrpc_req_set_repsize(req, 2, size);
3150 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3152 rc = ptlrpc_queue_wait(req);
3156 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3157 lustre_swab_obd_statfs);
3159 CERROR("Can't unpack obd_statfs\n");
3160 GOTO(out, rc = -EPROTO);
3163 memcpy(osfs, msfs, sizeof(*osfs));
3167 ptlrpc_req_finished(req);
3171 /* Retrieve object striping information.
3173 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3174 * the maximum number of OST indices which will fit in the user buffer.
3175 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3177 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3179 struct lov_user_md lum, *lumk;
3180 int rc = 0, lum_size;
3186 if (copy_from_user(&lum, lump, sizeof(lum)))
3189 if (lum.lmm_magic != LOV_USER_MAGIC)
3192 if (lum.lmm_stripe_count > 0) {
3193 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3194 OBD_ALLOC(lumk, lum_size);
3198 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3199 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3201 lum_size = sizeof(lum);
3205 lumk->lmm_object_id = lsm->lsm_object_id;
3206 lumk->lmm_object_gr = lsm->lsm_object_gr;
3207 lumk->lmm_stripe_count = 1;
3209 if (copy_to_user(lump, lumk, lum_size))
3213 OBD_FREE(lumk, lum_size);
3219 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3220 void *karg, void *uarg)
3222 struct obd_device *obd = exp->exp_obd;
3223 struct obd_ioctl_data *data = karg;
3227 if (!try_module_get(THIS_MODULE)) {
3228 CERROR("Can't get module. Is it alive?");
3232 case OBD_IOC_LOV_GET_CONFIG: {
3234 struct lov_desc *desc;
3235 struct obd_uuid uuid;
3239 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3240 GOTO(out, err = -EINVAL);
3242 data = (struct obd_ioctl_data *)buf;
3244 if (sizeof(*desc) > data->ioc_inllen1) {
3245 obd_ioctl_freedata(buf, len);
3246 GOTO(out, err = -EINVAL);
3249 if (data->ioc_inllen2 < sizeof(uuid)) {
3250 obd_ioctl_freedata(buf, len);
3251 GOTO(out, err = -EINVAL);
3254 desc = (struct lov_desc *)data->ioc_inlbuf1;
3255 desc->ld_tgt_count = 1;
3256 desc->ld_active_tgt_count = 1;
3257 desc->ld_default_stripe_count = 1;
3258 desc->ld_default_stripe_size = 0;
3259 desc->ld_default_stripe_offset = 0;
3260 desc->ld_pattern = 0;
3261 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3263 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3265 err = copy_to_user((void *)uarg, buf, len);
3268 obd_ioctl_freedata(buf, len);
3271 case LL_IOC_LOV_SETSTRIPE:
3272 err = obd_alloc_memmd(exp, karg);
3276 case LL_IOC_LOV_GETSTRIPE:
3277 err = osc_getstripe(karg, uarg);
3279 case OBD_IOC_CLIENT_RECOVER:
3280 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3285 case IOC_OSC_SET_ACTIVE:
3286 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3289 case OBD_IOC_POLL_QUOTACHECK:
3290 err = lquota_poll_check(quota_interface, exp,
3291 (struct if_quotacheck *)karg);
3294 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3295 cmd, cfs_curproc_comm());
3296 GOTO(out, err = -ENOTTY);
3299 module_put(THIS_MODULE);
3303 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3304 void *key, __u32 *vallen, void *val)
3307 if (!vallen || !val)
3310 if (KEY_IS("lock_to_stripe")) {
3311 __u32 *stripe = val;
3312 *vallen = sizeof(*stripe);
3315 } else if (KEY_IS("last_id")) {
3316 struct ptlrpc_request *req;
3318 char *bufs[2] = { NULL, key };
3319 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3321 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3322 OST_GET_INFO, 2, size, bufs);
3326 size[REPLY_REC_OFF] = *vallen;
3327 ptlrpc_req_set_repsize(req, 2, size);
3328 rc = ptlrpc_queue_wait(req);
3332 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3333 lustre_swab_ost_last_id);
3334 if (reply == NULL) {
3335 CERROR("Can't unpack OST last ID\n");
3336 GOTO(out, rc = -EPROTO);
3338 *((obd_id *)val) = *reply;
3340 ptlrpc_req_finished(req);
3346 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3349 struct llog_ctxt *ctxt;
3350 struct obd_import *imp = req->rq_import;
3356 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3359 rc = llog_initiator_connect(ctxt);
3361 CERROR("cannot establish connection for "
3362 "ctxt %p: %d\n", ctxt, rc);
3365 spin_lock(&imp->imp_lock);
3366 imp->imp_server_timeout = 1;
3367 imp->imp_pingable = 1;
3368 spin_unlock(&imp->imp_lock);
3369 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3374 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3375 void *key, obd_count vallen, void *val,
3376 struct ptlrpc_request_set *set)
3378 struct ptlrpc_request *req;
3379 struct obd_device *obd = exp->exp_obd;
3380 struct obd_import *imp = class_exp2cliimp(exp);
3381 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3382 char *bufs[3] = { NULL, key, val };
3385 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3387 if (KEY_IS(KEY_NEXT_ID)) {
3388 if (vallen != sizeof(obd_id))
3390 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3391 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3392 exp->exp_obd->obd_name,
3393 obd->u.cli.cl_oscc.oscc_next_id);
3398 if (KEY_IS("unlinked")) {
3399 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3400 spin_lock(&oscc->oscc_lock);
3401 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3402 spin_unlock(&oscc->oscc_lock);
3406 if (KEY_IS(KEY_INIT_RECOV)) {
3407 if (vallen != sizeof(int))
3409 spin_lock(&imp->imp_lock);
3410 imp->imp_initial_recov = *(int *)val;
3411 spin_unlock(&imp->imp_lock);
3412 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3413 exp->exp_obd->obd_name,
3414 imp->imp_initial_recov);
3418 if (KEY_IS("checksum")) {
3419 if (vallen != sizeof(int))
3421 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3425 if (KEY_IS(KEY_FLUSH_CTX)) {
3426 sptlrpc_import_flush_my_ctx(imp);
3433 /* We pass all other commands directly to OST. Since nobody calls osc
3434 methods directly and everybody is supposed to go through LOV, we
3435 assume lov checked invalid values for us.
3436 The only recognised values so far are evict_by_nid and mds_conn.
3437 Even if something bad goes through, we'd get a -EINVAL from OST
3440 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3445 if (KEY_IS(KEY_MDS_CONN)) {
3446 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3448 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3449 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3450 LASSERT(oscc->oscc_oa.o_gr > 0);
3451 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3454 ptlrpc_req_set_repsize(req, 1, NULL);
3455 ptlrpc_set_add_req(set, req);
3456 ptlrpc_check_set(set);
3462 static struct llog_operations osc_size_repl_logops = {
3463 lop_cancel: llog_obd_repl_cancel
3466 static struct llog_operations osc_mds_ost_orig_logops;
3467 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3468 struct obd_device *tgt, int count,
3469 struct llog_catid *catid, struct obd_uuid *uuid)
3474 spin_lock(&obd->obd_dev_lock);
3475 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3476 osc_mds_ost_orig_logops = llog_lvfs_ops;
3477 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3478 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3479 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3480 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3482 spin_unlock(&obd->obd_dev_lock);
3484 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3485 &catid->lci_logid, &osc_mds_ost_orig_logops);
3487 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3491 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3492 &osc_size_repl_logops);
3494 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3497 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3498 obd->obd_name, tgt->obd_name, count, catid, rc);
3499 CERROR("logid "LPX64":0x%x\n",
3500 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3505 static int osc_llog_finish(struct obd_device *obd, int count)
3507 struct llog_ctxt *ctxt;
3508 int rc = 0, rc2 = 0;
3511 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3513 rc = llog_cleanup(ctxt);
3515 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3517 rc2 = llog_cleanup(ctxt);
3524 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3525 struct obd_uuid *cluuid,
3526 struct obd_connect_data *data)
3528 struct client_obd *cli = &obd->u.cli;
3530 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3533 client_obd_list_lock(&cli->cl_loi_list_lock);
3534 data->ocd_grant = cli->cl_avail_grant ?:
3535 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3536 lost_grant = cli->cl_lost_grant;
3537 cli->cl_lost_grant = 0;
3538 client_obd_list_unlock(&cli->cl_loi_list_lock);
3540 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3541 "cl_lost_grant: %ld\n", data->ocd_grant,
3542 cli->cl_avail_grant, lost_grant);
3543 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3544 " ocd_grant: %d\n", data->ocd_connect_flags,
3545 data->ocd_version, data->ocd_grant);
3551 static int osc_disconnect(struct obd_export *exp)
3553 struct obd_device *obd = class_exp2obd(exp);
3554 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3557 if (obd->u.cli.cl_conn_count == 1)
3558 /* flush any remaining cancel messages out to the target */
3559 llog_sync(ctxt, exp);
3561 rc = client_disconnect_export(exp);
3565 static int osc_import_event(struct obd_device *obd,
3566 struct obd_import *imp,
3567 enum obd_import_event event)
3569 struct client_obd *cli;
3573 LASSERT(imp->imp_obd == obd);
3576 case IMP_EVENT_DISCON: {
3577 /* Only do this on the MDS OSC's */
3578 if (imp->imp_server_timeout) {
3579 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3581 spin_lock(&oscc->oscc_lock);
3582 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3583 spin_unlock(&oscc->oscc_lock);
3586 client_obd_list_lock(&cli->cl_loi_list_lock);
3587 cli->cl_avail_grant = 0;
3588 cli->cl_lost_grant = 0;
3589 client_obd_list_unlock(&cli->cl_loi_list_lock);
3592 case IMP_EVENT_INACTIVE: {
3593 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3596 case IMP_EVENT_INVALIDATE: {
3597 struct ldlm_namespace *ns = obd->obd_namespace;
3601 client_obd_list_lock(&cli->cl_loi_list_lock);
3602 /* all pages go to failing rpcs due to the invalid import */
3603 osc_check_rpcs(cli);
3604 client_obd_list_unlock(&cli->cl_loi_list_lock);
3606 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3610 case IMP_EVENT_ACTIVE: {
3611 /* Only do this on the MDS OSC's */
3612 if (imp->imp_server_timeout) {
3613 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3615 spin_lock(&oscc->oscc_lock);
3616 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3617 spin_unlock(&oscc->oscc_lock);
3619 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3622 case IMP_EVENT_OCD: {
3623 struct obd_connect_data *ocd = &imp->imp_connect_data;
3625 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3626 osc_init_grant(&obd->u.cli, ocd);
3629 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3630 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3632 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3636 CERROR("Unknown import event %d\n", event);
3642 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3648 rc = ptlrpcd_addref();
3652 rc = client_obd_setup(obd, lcfg);
3656 struct lprocfs_static_vars lvars = { 0 };
3657 struct client_obd *cli = &obd->u.cli;
3659 lprocfs_osc_init_vars(&lvars);
3660 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3661 lproc_osc_attach_seqstat(obd);
3662 ptlrpc_lprocfs_register_obd(obd);
3666 /* We need to allocate a few requests more, because
3667 brw_interpret_oap tries to create new requests before freeing
3668 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3669 reserved, but I afraid that might be too much wasted RAM
3670 in fact, so 2 is just my guess and still should work. */
3671 cli->cl_import->imp_rq_pool =
3672 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3674 ptlrpc_add_rqs_to_pool);
3680 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3686 case OBD_CLEANUP_EARLY: {
3687 struct obd_import *imp;
3688 imp = obd->u.cli.cl_import;
3689 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3690 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3691 ptlrpc_deactivate_import(imp);
3692 spin_lock(&imp->imp_lock);
3693 imp->imp_pingable = 0;
3694 spin_unlock(&imp->imp_lock);
3697 case OBD_CLEANUP_EXPORTS: {
3698 /* If we set up but never connected, the
3699 client import will not have been cleaned. */
3700 if (obd->u.cli.cl_import) {
3701 struct obd_import *imp;
3702 imp = obd->u.cli.cl_import;
3703 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3705 ptlrpc_invalidate_import(imp);
3706 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3707 class_destroy_import(imp);
3708 obd->u.cli.cl_import = NULL;
3712 case OBD_CLEANUP_SELF_EXP:
3713 rc = obd_llog_finish(obd, 0);
3715 CERROR("failed to cleanup llogging subsystems\n");
3717 case OBD_CLEANUP_OBD:
3723 int osc_cleanup(struct obd_device *obd)
3725 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3729 ptlrpc_lprocfs_unregister_obd(obd);
3730 lprocfs_obd_cleanup(obd);
3732 spin_lock(&oscc->oscc_lock);
3733 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3734 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3735 spin_unlock(&oscc->oscc_lock);
3737 /* free memory of osc quota cache */
3738 lquota_cleanup(quota_interface, obd);
3740 rc = client_obd_cleanup(obd);
3746 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3748 struct lustre_cfg *lcfg = buf;
3749 struct lprocfs_static_vars lvars = { 0 };
3752 lprocfs_osc_init_vars(&lvars);
3754 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3758 struct obd_ops osc_obd_ops = {
3759 .o_owner = THIS_MODULE,
3760 .o_setup = osc_setup,
3761 .o_precleanup = osc_precleanup,
3762 .o_cleanup = osc_cleanup,
3763 .o_add_conn = client_import_add_conn,
3764 .o_del_conn = client_import_del_conn,
3765 .o_connect = client_connect_import,
3766 .o_reconnect = osc_reconnect,
3767 .o_disconnect = osc_disconnect,
3768 .o_statfs = osc_statfs,
3769 .o_statfs_async = osc_statfs_async,
3770 .o_packmd = osc_packmd,
3771 .o_unpackmd = osc_unpackmd,
3772 .o_precreate = osc_precreate,
3773 .o_create = osc_create,
3774 .o_destroy = osc_destroy,
3775 .o_getattr = osc_getattr,
3776 .o_getattr_async = osc_getattr_async,
3777 .o_setattr = osc_setattr,
3778 .o_setattr_async = osc_setattr_async,
3780 .o_brw_async = osc_brw_async,
3781 .o_prep_async_page = osc_prep_async_page,
3782 .o_queue_async_io = osc_queue_async_io,
3783 .o_set_async_flags = osc_set_async_flags,
3784 .o_queue_group_io = osc_queue_group_io,
3785 .o_trigger_group_io = osc_trigger_group_io,
3786 .o_teardown_async_page = osc_teardown_async_page,
3787 .o_punch = osc_punch,
3789 .o_enqueue = osc_enqueue,
3790 .o_match = osc_match,
3791 .o_change_cbdata = osc_change_cbdata,
3792 .o_cancel = osc_cancel,
3793 .o_cancel_unused = osc_cancel_unused,
3794 .o_join_lru = osc_join_lru,
3795 .o_iocontrol = osc_iocontrol,
3796 .o_get_info = osc_get_info,
3797 .o_set_info_async = osc_set_info_async,
3798 .o_import_event = osc_import_event,
3799 .o_llog_init = osc_llog_init,
3800 .o_llog_finish = osc_llog_finish,
3801 .o_process_config = osc_process_config,
3803 int __init osc_init(void)
3805 struct lprocfs_static_vars lvars = { 0 };
3809 lprocfs_osc_init_vars(&lvars);
3811 request_module("lquota");
3812 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3813 lquota_init(quota_interface);
3814 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3816 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3817 LUSTRE_OSC_NAME, NULL);
3819 if (quota_interface)
3820 PORTAL_SYMBOL_PUT(osc_quota_interface);
3828 static void /*__exit*/ osc_exit(void)
3830 lquota_exit(quota_interface);
3831 if (quota_interface)
3832 PORTAL_SYMBOL_PUT(osc_quota_interface);
3834 class_unregister_type(LUSTRE_OSC_NAME);
3837 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3838 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3839 MODULE_LICENSE("GPL");
3841 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);