1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
144 /* XXX zero *lsmp? */
145 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147 LASSERT((*lsmp)->lsm_object_id);
148 LASSERT((*lsmp)->lsm_object_gr);
151 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157 struct ost_body *body, void *capa)
159 struct obd_capa *oc = (struct obd_capa *)capa;
160 struct lustre_capa *c;
165 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
168 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169 DEBUG_CAPA(D_SEC, c, "pack");
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173 struct obd_info *oinfo)
175 struct ost_body *body;
177 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178 body->oa = *oinfo->oi_oa;
179 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183 struct osc_async_args *aa, int rc)
185 struct ost_body *body;
191 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192 lustre_swab_ost_body);
194 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
197 /* This should really be sent by the OST */
198 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
201 CERROR("can't unpack ost_body\n");
203 aa->aa_oi->oi_oa->o_valid = 0;
206 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211 struct ptlrpc_request_set *set)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216 struct osc_async_args *aa;
219 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221 OST_GETATTR, 3, size,NULL);
225 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
227 ptlrpc_req_set_repsize(req, 2, size);
228 req->rq_interpret_reply = osc_getattr_interpret;
230 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231 aa = (struct osc_async_args *)&req->rq_async_args;
234 ptlrpc_set_add_req(set, req);
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
240 struct ptlrpc_request *req;
241 struct ost_body *body;
242 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
245 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247 OST_GETATTR, 3, size, NULL);
251 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
253 ptlrpc_req_set_repsize(req, 2, size);
255 rc = ptlrpc_queue_wait(req);
257 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262 lustre_swab_ost_body);
264 CERROR ("can't unpack ost_body\n");
265 GOTO (out, rc = -EPROTO);
268 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269 *oinfo->oi_oa = body->oa;
271 /* This should really be sent by the OST */
272 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277 ptlrpc_req_finished(req);
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282 struct obd_trans_info *oti)
284 struct ptlrpc_request *req;
285 struct ost_body *body;
286 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
289 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290 oinfo->oi_oa->o_gr > 0);
291 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293 OST_SETATTR, 3, size, NULL);
297 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
299 ptlrpc_req_set_repsize(req, 2, size);
301 rc = ptlrpc_queue_wait(req);
305 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306 lustre_swab_ost_body);
308 GOTO(out, rc = -EPROTO);
310 *oinfo->oi_oa = body->oa;
314 ptlrpc_req_finished(req);
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319 struct osc_async_args *aa, int rc)
321 struct ost_body *body;
327 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328 lustre_swab_ost_body);
330 CERROR("can't unpack ost_body\n");
331 GOTO(out, rc = -EPROTO);
334 *aa->aa_oi->oi_oa = body->oa;
336 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341 struct obd_trans_info *oti,
342 struct ptlrpc_request_set *rqset)
344 struct ptlrpc_request *req;
345 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346 struct osc_async_args *aa;
349 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351 OST_SETATTR, 3, size, NULL);
355 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
358 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
361 ptlrpc_req_set_repsize(req, 2, size);
362 /* do mds to ost setattr asynchronouly */
364 /* Do not wait for response. */
365 ptlrpcd_add_req(req);
367 req->rq_interpret_reply = osc_setattr_interpret;
369 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370 aa = (struct osc_async_args *)&req->rq_async_args;
373 ptlrpc_set_add_req(rqset, req);
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380 struct lov_stripe_md **ea, struct obd_trans_info *oti)
382 struct ptlrpc_request *req;
383 struct ost_body *body;
384 struct lov_stripe_md *lsm;
385 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
393 rc = obd_alloc_memmd(exp, &lsm);
398 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399 OST_CREATE, 2, size, NULL);
401 GOTO(out, rc = -ENOMEM);
403 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
406 ptlrpc_req_set_repsize(req, 2, size);
407 if (oa->o_valid & OBD_MD_FLINLINE) {
408 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409 oa->o_flags == OBD_FL_DELORPHAN);
411 "delorphan from OST integration");
412 /* Don't resend the delorphan req */
413 req->rq_no_resend = req->rq_no_delay = 1;
416 rc = ptlrpc_queue_wait(req);
420 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421 lustre_swab_ost_body);
423 CERROR ("can't unpack ost_body\n");
424 GOTO (out_req, rc = -EPROTO);
429 /* This should really be sent by the OST */
430 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431 oa->o_valid |= OBD_MD_FLBLKSZ;
433 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434 * have valid lsm_oinfo data structs, so don't go touching that.
435 * This needs to be fixed in a big way.
437 lsm->lsm_object_id = oa->o_id;
438 lsm->lsm_object_gr = oa->o_gr;
442 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
444 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445 if (!oti->oti_logcookies)
446 oti_alloc_cookies(oti, 1);
447 *oti->oti_logcookies = *obdo_logcookie(oa);
451 CDEBUG(D_HA, "transno: "LPD64"\n",
452 lustre_msg_get_transno(req->rq_repmsg));
455 ptlrpc_req_finished(req);
458 obd_free_memmd(exp, &lsm);
462 static int osc_punch_interpret(struct ptlrpc_request *req,
463 struct osc_async_args *aa, int rc)
465 struct ost_body *body;
471 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
472 lustre_swab_ost_body);
474 CERROR ("can't unpack ost_body\n");
475 GOTO(out, rc = -EPROTO);
478 *aa->aa_oi->oi_oa = body->oa;
480 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
484 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
485 struct obd_trans_info *oti,
486 struct ptlrpc_request_set *rqset)
488 struct ptlrpc_request *req;
489 struct osc_async_args *aa;
490 struct ost_body *body;
491 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
499 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
500 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
501 OST_PUNCH, 3, size, NULL);
505 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
507 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
508 /* overload the size and blocks fields in the oa with start/end */
509 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
510 body->oa.o_size = oinfo->oi_policy.l_extent.start;
511 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
512 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
514 ptlrpc_req_set_repsize(req, 2, size);
516 req->rq_interpret_reply = osc_punch_interpret;
517 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
518 aa = (struct osc_async_args *)&req->rq_async_args;
520 ptlrpc_set_add_req(rqset, req);
525 static int osc_sync(struct obd_export *exp, struct obdo *oa,
526 struct lov_stripe_md *md, obd_size start, obd_size end,
529 struct ptlrpc_request *req;
530 struct ost_body *body;
531 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
539 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
541 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
542 OST_SYNC, 3, size, NULL);
546 /* overload the size and blocks fields in the oa with start/end */
547 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
549 body->oa.o_size = start;
550 body->oa.o_blocks = end;
551 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
553 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
555 ptlrpc_req_set_repsize(req, 2, size);
557 rc = ptlrpc_queue_wait(req);
561 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
562 lustre_swab_ost_body);
564 CERROR ("can't unpack ost_body\n");
565 GOTO (out, rc = -EPROTO);
572 ptlrpc_req_finished(req);
576 /* Destroy requests can be async always on the client, and we don't even really
577 * care about the return code since the client cannot do anything at all about
579 * When the MDS is unlinking a filename, it saves the file objects into a
580 * recovery llog, and these object records are cancelled when the OST reports
581 * they were destroyed and sync'd to disk (i.e. transaction committed).
582 * If the client dies, or the OST is down when the object should be destroyed,
583 * the records are not cancelled, and when the OST reconnects to the MDS next,
584 * it will retrieve the llog unlink logs and then sends the log cancellation
585 * cookies to the MDS after committing destroy transactions. */
586 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
587 struct lov_stripe_md *ea, struct obd_trans_info *oti,
588 struct obd_export *md_export)
590 struct ptlrpc_request *req;
591 struct ost_body *body;
592 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
600 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
601 OST_DESTROY, 2, size, NULL);
605 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
607 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
608 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
609 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
610 sizeof(*oti->oti_logcookies));
613 ptlrpc_req_set_repsize(req, 2, size);
615 ptlrpcd_add_req(req);
619 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
622 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
624 LASSERT(!(oa->o_valid & bits));
627 client_obd_list_lock(&cli->cl_loi_list_lock);
628 oa->o_dirty = cli->cl_dirty;
629 if (cli->cl_dirty > cli->cl_dirty_max) {
630 CERROR("dirty %lu > dirty_max %lu\n",
631 cli->cl_dirty, cli->cl_dirty_max);
633 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
634 CERROR("dirty %d > system dirty_max %d\n",
635 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
637 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
638 CERROR("dirty %lu - dirty_max %lu too big???\n",
639 cli->cl_dirty, cli->cl_dirty_max);
642 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
643 (cli->cl_max_rpcs_in_flight + 1);
644 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
646 oa->o_grant = cli->cl_avail_grant;
647 oa->o_dropped = cli->cl_lost_grant;
648 cli->cl_lost_grant = 0;
649 client_obd_list_unlock(&cli->cl_loi_list_lock);
650 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
651 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
654 /* caller must hold loi_list_lock */
655 static void osc_consume_write_grant(struct client_obd *cli,
656 struct brw_page *pga)
658 atomic_inc(&obd_dirty_pages);
659 cli->cl_dirty += CFS_PAGE_SIZE;
660 cli->cl_avail_grant -= CFS_PAGE_SIZE;
661 pga->flag |= OBD_BRW_FROM_GRANT;
662 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
663 CFS_PAGE_SIZE, pga, pga->pg);
664 LASSERT(cli->cl_avail_grant >= 0);
667 /* the companion to osc_consume_write_grant, called when a brw has completed.
668 * must be called with the loi lock held. */
669 static void osc_release_write_grant(struct client_obd *cli,
670 struct brw_page *pga, int sent)
672 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
675 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
680 pga->flag &= ~OBD_BRW_FROM_GRANT;
681 atomic_dec(&obd_dirty_pages);
682 cli->cl_dirty -= CFS_PAGE_SIZE;
684 cli->cl_lost_grant += CFS_PAGE_SIZE;
685 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
686 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
687 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
688 /* For short writes we shouldn't count parts of pages that
689 * span a whole block on the OST side, or our accounting goes
690 * wrong. Should match the code in filter_grant_check. */
691 int offset = pga->off & ~CFS_PAGE_MASK;
692 int count = pga->count + (offset & (blocksize - 1));
693 int end = (offset + pga->count) & (blocksize - 1);
695 count += blocksize - end;
697 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
698 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
699 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
700 cli->cl_avail_grant, cli->cl_dirty);
706 static unsigned long rpcs_in_flight(struct client_obd *cli)
708 return cli->cl_r_in_flight + cli->cl_w_in_flight;
711 /* caller must hold loi_list_lock */
712 void osc_wake_cache_waiters(struct client_obd *cli)
714 struct list_head *l, *tmp;
715 struct osc_cache_waiter *ocw;
718 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
719 /* if we can't dirty more, we must wait until some is written */
720 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
721 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
722 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
723 "osc max %ld, sys max %d\n", cli->cl_dirty,
724 cli->cl_dirty_max, obd_max_dirty_pages);
728 /* if still dirty cache but no grant wait for pending RPCs that
729 * may yet return us some grant before doing sync writes */
730 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
731 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
732 cli->cl_w_in_flight);
736 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
737 list_del_init(&ocw->ocw_entry);
738 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
739 /* no more RPCs in flight to return grant, do sync IO */
740 ocw->ocw_rc = -EDQUOT;
741 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
743 osc_consume_write_grant(cli,
744 &ocw->ocw_oap->oap_brw_page);
747 cfs_waitq_signal(&ocw->ocw_waitq);
753 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
755 client_obd_list_lock(&cli->cl_loi_list_lock);
756 cli->cl_avail_grant = ocd->ocd_grant;
757 client_obd_list_unlock(&cli->cl_loi_list_lock);
759 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
760 cli->cl_avail_grant, cli->cl_lost_grant);
761 LASSERT(cli->cl_avail_grant >= 0);
764 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
766 client_obd_list_lock(&cli->cl_loi_list_lock);
767 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
768 cli->cl_avail_grant += body->oa.o_grant;
769 /* waiters are woken in brw_interpret_oap */
770 client_obd_list_unlock(&cli->cl_loi_list_lock);
773 /* We assume that the reason this OSC got a short read is because it read
774 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
775 * via the LOV, and it _knows_ it's reading inside the file, it's just that
776 * this stripe never got written at or beyond this stripe offset yet. */
777 static void handle_short_read(int nob_read, obd_count page_count,
778 struct brw_page **pga)
783 /* skip bytes read OK */
784 while (nob_read > 0) {
785 LASSERT (page_count > 0);
787 if (pga[i]->count > nob_read) {
788 /* EOF inside this page */
789 ptr = cfs_kmap(pga[i]->pg) +
790 (pga[i]->off & ~CFS_PAGE_MASK);
791 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
792 cfs_kunmap(pga[i]->pg);
798 nob_read -= pga[i]->count;
803 /* zero remaining pages */
804 while (page_count-- > 0) {
805 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
806 memset(ptr, 0, pga[i]->count);
807 cfs_kunmap(pga[i]->pg);
812 static int check_write_rcs(struct ptlrpc_request *req,
813 int requested_nob, int niocount,
814 obd_count page_count, struct brw_page **pga)
818 /* return error if any niobuf was in error */
819 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
820 sizeof(*remote_rcs) * niocount, NULL);
821 if (remote_rcs == NULL) {
822 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
825 if (lustre_msg_swabbed(req->rq_repmsg))
826 for (i = 0; i < niocount; i++)
827 __swab32s(&remote_rcs[i]);
829 for (i = 0; i < niocount; i++) {
830 if (remote_rcs[i] < 0)
831 return(remote_rcs[i]);
833 if (remote_rcs[i] != 0) {
834 CERROR("rc[%d] invalid (%d) req %p\n",
835 i, remote_rcs[i], req);
840 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
841 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
842 requested_nob, req->rq_bulk->bd_nob_transferred);
849 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
851 if (p1->flag != p2->flag) {
852 unsigned mask = ~OBD_BRW_FROM_GRANT;
854 /* warn if we try to combine flags that we don't know to be
856 if ((p1->flag & mask) != (p2->flag & mask))
857 CERROR("is it ok to have flags 0x%x and 0x%x in the "
858 "same brw?\n", p1->flag, p2->flag);
862 return (p1->off + p1->count == p2->off);
865 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
866 struct brw_page **pga)
871 LASSERT (pg_count > 0);
872 while (nob > 0 && pg_count > 0) {
873 char *ptr = cfs_kmap(pga[i]->pg);
874 int off = pga[i]->off & ~CFS_PAGE_MASK;
875 int count = pga[i]->count > nob ? nob : pga[i]->count;
877 /* corrupt the data before we compute the checksum, to
878 * simulate an OST->client data error */
880 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
881 memcpy(ptr + off, "bad1", min(4, nob));
882 cksum = crc32_le(cksum, ptr + off, count);
883 cfs_kunmap(pga[i]->pg);
884 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
887 nob -= pga[i]->count;
891 /* For sending we only compute the wrong checksum instead
892 * of corrupting the data so it is still correct on a redo */
893 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
899 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
900 struct lov_stripe_md *lsm, obd_count page_count,
901 struct brw_page **pga,
902 struct ptlrpc_request **reqp,
903 struct obd_capa *ocapa)
905 struct ptlrpc_request *req;
906 struct ptlrpc_bulk_desc *desc;
907 struct ost_body *body;
908 struct obd_ioobj *ioobj;
909 struct niobuf_remote *niobuf;
910 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
911 int niocount, i, requested_nob, opc, rc;
912 struct ptlrpc_request_pool *pool;
913 struct lustre_capa *capa;
914 struct osc_brw_async_args *aa;
917 if ((cmd & OBD_BRW_WRITE) != 0) {
919 pool = cli->cl_import->imp_rq_pool;
925 for (niocount = i = 1; i < page_count; i++) {
926 if (!can_merge_pages(pga[i - 1], pga[i]))
930 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
931 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
933 size[REQ_REC_OFF + 3] = sizeof(*capa);
935 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
936 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
937 size, NULL, pool, NULL);
941 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
943 if (opc == OST_WRITE)
944 desc = ptlrpc_prep_bulk_imp (req, page_count,
945 BULK_GET_SOURCE, OST_BULK_PORTAL);
947 desc = ptlrpc_prep_bulk_imp (req, page_count,
948 BULK_PUT_SINK, OST_BULK_PORTAL);
950 GOTO(out, rc = -ENOMEM);
951 /* NB request now owns desc and will free it when it gets freed */
953 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
954 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
955 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
956 niocount * sizeof(*niobuf));
960 obdo_to_ioobj(oa, ioobj);
961 ioobj->ioo_bufcnt = niocount;
963 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
965 capa_cpy(capa, ocapa);
966 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
969 LASSERT (page_count > 0);
970 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
971 struct brw_page *pg = pga[i];
972 struct brw_page *pg_prev = pga[i - 1];
974 LASSERT(pg->count > 0);
975 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
976 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
979 LASSERTF(i == 0 || pg->off > pg_prev->off,
980 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
981 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
983 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
984 pg_prev->pg, page_private(pg_prev->pg),
985 pg_prev->pg->index, pg_prev->off);
987 LASSERTF(i == 0 || pg->off > pg_prev->off,
988 "i %d p_c %u\n", i, page_count);
990 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
991 (pg->flag & OBD_BRW_SRVLOCK));
993 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
995 requested_nob += pg->count;
997 if (i > 0 && can_merge_pages(pg_prev, pg)) {
999 niobuf->len += pg->count;
1001 niobuf->offset = pg->off;
1002 niobuf->len = pg->count;
1003 niobuf->flags = pg->flag;
1007 LASSERT((void *)(niobuf - niocount) ==
1008 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1009 niocount * sizeof(*niobuf)));
1010 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1012 /* size[REQ_REC_OFF] still sizeof (*body) */
1013 if (opc == OST_WRITE) {
1014 if (unlikely(cli->cl_checksum)) {
1015 body->oa.o_valid |= OBD_MD_FLCKSUM;
1016 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1018 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1020 /* save this in 'oa', too, for later checking */
1021 oa->o_valid |= OBD_MD_FLCKSUM;
1023 /* clear out the checksum flag, in case this is a
1024 * resend but cl_checksum is no longer set. b=11238 */
1025 oa->o_valid &= ~OBD_MD_FLCKSUM;
1027 oa->o_cksum = body->oa.o_cksum;
1028 /* 1 RC per niobuf */
1029 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1030 ptlrpc_req_set_repsize(req, 3, size);
1032 if (unlikely(cli->cl_checksum))
1033 body->oa.o_valid |= OBD_MD_FLCKSUM;
1034 /* 1 RC for the whole I/O */
1035 ptlrpc_req_set_repsize(req, 2, size);
1038 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1039 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1041 aa->aa_requested_nob = requested_nob;
1042 aa->aa_nio_count = niocount;
1043 aa->aa_page_count = page_count;
1044 aa->aa_retries = 5; /*retry for checksum errors; lprocfs? */
1047 INIT_LIST_HEAD(&aa->aa_oaps);
1053 ptlrpc_req_finished (req);
1057 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1058 __u32 client_cksum, __u32 server_cksum,
1059 int nob, obd_count page_count,
1060 struct brw_page **pga)
1065 if (server_cksum == client_cksum) {
1066 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1070 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1072 if (new_cksum == server_cksum)
1073 msg = "changed on the client after we checksummed it - "
1074 "likely false positive due to mmap IO (bug 11742)";
1075 else if (new_cksum == client_cksum)
1076 msg = "changed in transit before arrival at OST";
1078 msg = "changed in transit AND doesn't match the original - "
1079 "likely false positive due to mmap IO (bug 11742)";
1081 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1082 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1083 "["LPU64"-"LPU64"]\n",
1084 msg, libcfs_nid2str(peer->nid),
1085 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1086 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1089 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1091 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1092 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1093 client_cksum, server_cksum, new_cksum);
1097 /* Note rc enters this function as number of bytes transferred */
1098 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1100 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1101 const lnet_process_id_t *peer =
1102 &req->rq_import->imp_connection->c_peer;
1103 struct client_obd *cli = aa->aa_cli;
1104 struct ost_body *body;
1105 __u32 client_cksum = 0;
1108 if (rc < 0 && rc != -EDQUOT)
1111 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1112 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1113 lustre_swab_ost_body);
1115 CERROR ("Can't unpack body\n");
1119 /* set/clear over quota flag for a uid/gid */
1120 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1121 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1122 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1123 body->oa.o_gid, body->oa.o_valid,
1129 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1130 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1132 osc_update_grant(cli, body);
1134 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1136 CERROR ("Unexpected +ve rc %d\n", rc);
1139 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1141 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1143 check_write_checksum(&body->oa, peer, client_cksum,
1145 aa->aa_requested_nob,
1150 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
1152 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1153 aa->aa_page_count, aa->aa_ppga);
1157 /* The rest of this function executes only for OST_READs */
1158 if (rc > aa->aa_requested_nob) {
1159 CERROR("Unexpected rc %d (%d requested)\n", rc,
1160 aa->aa_requested_nob);
1164 if (rc != req->rq_bulk->bd_nob_transferred) {
1165 CERROR ("Unexpected rc %d (%d transferred)\n",
1166 rc, req->rq_bulk->bd_nob_transferred);
1170 if (rc < aa->aa_requested_nob)
1171 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1173 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1174 static int cksum_counter;
1175 __u32 server_cksum = body->oa.o_cksum;
1179 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1182 if (peer->nid == req->rq_bulk->bd_sender) {
1186 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1189 if (server_cksum == ~0 && rc > 0) {
1190 CERROR("Protocol error: server %s set the 'checksum' "
1191 "bit, but didn't send a checksum. Not fatal, "
1192 "but please tell CFS.\n",
1193 libcfs_nid2str(peer->nid));
1194 } else if (server_cksum != client_cksum) {
1195 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1196 "%s%s%s inum "LPU64"/"LPU64" object "
1197 LPU64"/"LPU64" extent "
1198 "["LPU64"-"LPU64"]\n",
1199 req->rq_import->imp_obd->obd_name,
1200 libcfs_nid2str(peer->nid),
1202 body->oa.o_valid & OBD_MD_FLFID ?
1203 body->oa.o_fid : (__u64)0,
1204 body->oa.o_valid & OBD_MD_FLFID ?
1205 body->oa.o_generation :(__u64)0,
1207 body->oa.o_valid & OBD_MD_FLGROUP ?
1208 body->oa.o_gr : (__u64)0,
1209 aa->aa_ppga[0]->off,
1210 aa->aa_ppga[aa->aa_page_count-1]->off +
1211 aa->aa_ppga[aa->aa_page_count-1]->count -
1213 CERROR("client %x, server %x\n",
1214 client_cksum, server_cksum);
1216 aa->aa_oa->o_cksum = client_cksum;
1220 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1223 } else if (unlikely(client_cksum)) {
1224 static int cksum_missed;
1227 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1228 CERROR("Checksum %u requested from %s but not sent\n",
1229 cksum_missed, libcfs_nid2str(peer->nid));
1235 *aa->aa_oa = body->oa;
1237 sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count, aa->aa_ppga);
1242 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1243 struct lov_stripe_md *lsm,
1244 obd_count page_count, struct brw_page **pga,
1245 struct obd_capa *ocapa)
1247 struct ptlrpc_request *req;
1248 int rc, retries = 5; /* lprocfs? */
1252 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1253 page_count, pga, &req, ocapa);
1257 rc = ptlrpc_queue_wait(req);
1259 if (rc == -ETIMEDOUT && req->rq_resend) {
1260 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1261 ptlrpc_req_finished(req);
1265 rc = osc_brw_fini_request(req, rc);
1267 ptlrpc_req_finished(req);
1268 if (rc == -EAGAIN) {
1276 int osc_brw_redo_request(struct ptlrpc_request *req,
1277 struct osc_brw_async_args *aa)
1279 struct ptlrpc_request *new_req;
1280 struct ptlrpc_request_set *set = req->rq_set;
1281 struct osc_brw_async_args *new_aa;
1282 struct osc_async_page *oap;
1286 if (aa->aa_retries-- <= 0) {
1287 CERROR("too many checksum retries, returning error\n");
1291 DEBUG_REQ(D_ERROR, req, "redo for checksum error");
1292 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1293 if (oap->oap_request != NULL) {
1294 LASSERTF(req == oap->oap_request,
1295 "request %p != oap_request %p\n",
1296 req, oap->oap_request);
1297 if (oap->oap_interrupted) {
1298 ptlrpc_mark_interrupted(oap->oap_request);
1306 /* TODO-MERGE: and where to get ocapa?? */
1307 rc = osc_brw_prep_request(lustre_msg_get_opc(req->rq_reqmsg) ==
1308 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1309 aa->aa_cli, aa->aa_oa,
1310 NULL /* lsm unused by osc currently */,
1311 aa->aa_page_count, aa->aa_ppga, &new_req,
1316 /* New request takes over pga and oaps from old request.
1317 * Note that copying a list_head doesn't work, need to move it... */
1318 new_req->rq_interpret_reply = req->rq_interpret_reply;
1319 new_req->rq_async_args = req->rq_async_args;
1320 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1321 INIT_LIST_HEAD(&new_aa->aa_oaps);
1322 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1323 INIT_LIST_HEAD(&aa->aa_oaps);
1325 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1326 if (oap->oap_request) {
1327 ptlrpc_req_finished(oap->oap_request);
1328 oap->oap_request = ptlrpc_request_addref(new_req);
1332 ptlrpc_set_add_req(set, new_req);
1337 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1339 struct osc_brw_async_args *aa = data;
1344 rc = osc_brw_fini_request(req, rc);
1345 if (rc == -EAGAIN) {
1346 rc = osc_brw_redo_request(req, aa);
1350 if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1351 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1353 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1354 for (i = 0; i < aa->aa_page_count; i++)
1355 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1356 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1358 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1363 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1364 struct lov_stripe_md *lsm, obd_count page_count,
1365 struct brw_page **pga, struct ptlrpc_request_set *set,
1366 struct obd_capa *ocapa)
1368 struct ptlrpc_request *req;
1369 struct client_obd *cli = &exp->exp_obd->u.cli;
1373 /* Consume write credits even if doing a sync write -
1374 * otherwise we may run out of space on OST due to grant. */
1375 if (cmd == OBD_BRW_WRITE) {
1376 spin_lock(&cli->cl_loi_list_lock);
1377 for (i = 0; i < page_count; i++) {
1378 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1379 osc_consume_write_grant(cli, pga[i]);
1381 spin_unlock(&cli->cl_loi_list_lock);
1384 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1387 req->rq_interpret_reply = brw_interpret;
1388 ptlrpc_set_add_req(set, req);
1389 } else if (cmd == OBD_BRW_WRITE) {
1390 spin_lock(&cli->cl_loi_list_lock);
1391 for (i = 0; i < page_count; i++)
1392 osc_release_write_grant(cli, pga[i], 0);
1393 spin_unlock(&cli->cl_loi_list_lock);
1399 * ugh, we want disk allocation on the target to happen in offset order. we'll
1400 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1401 * fine for our small page arrays and doesn't require allocation. its an
1402 * insertion sort that swaps elements that are strides apart, shrinking the
1403 * stride down until its '1' and the array is sorted.
1405 static void sort_brw_pages(struct brw_page **array, int num)
1408 struct brw_page *tmp;
1412 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1417 for (i = stride ; i < num ; i++) {
1420 while (j >= stride && array[j - stride]->off > tmp->off) {
1421 array[j] = array[j - stride];
1426 } while (stride > 1);
1429 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1435 LASSERT (pages > 0);
1436 offset = pg[i]->off & ~CFS_PAGE_MASK;
1440 if (pages == 0) /* that's all */
1443 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1444 return count; /* doesn't end on page boundary */
1447 offset = pg[i]->off & ~CFS_PAGE_MASK;
1448 if (offset != 0) /* doesn't start on page boundary */
1455 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1457 struct brw_page **ppga;
1460 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1464 for (i = 0; i < count; i++)
1469 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1471 LASSERT(ppga != NULL);
1472 OBD_FREE(ppga, sizeof(*ppga) * count);
1475 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1476 obd_count page_count, struct brw_page *pga,
1477 struct obd_trans_info *oti)
1479 struct obdo *saved_oa = NULL;
1480 struct brw_page **ppga, **orig;
1481 struct obd_import *imp = class_exp2cliimp(exp);
1482 struct client_obd *cli = &imp->imp_obd->u.cli;
1483 int rc, page_count_orig;
1486 if (cmd & OBD_BRW_CHECK) {
1487 /* The caller just wants to know if there's a chance that this
1488 * I/O can succeed */
1490 if (imp == NULL || imp->imp_invalid)
1495 /* test_brw with a failed create can trip this, maybe others. */
1496 LASSERT(cli->cl_max_pages_per_rpc);
1500 orig = ppga = osc_build_ppga(pga, page_count);
1503 page_count_orig = page_count;
1505 sort_brw_pages(ppga, page_count);
1506 while (page_count) {
1507 obd_count pages_per_brw;
1509 if (page_count > cli->cl_max_pages_per_rpc)
1510 pages_per_brw = cli->cl_max_pages_per_rpc;
1512 pages_per_brw = page_count;
1514 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1516 if (saved_oa != NULL) {
1517 /* restore previously saved oa */
1518 *oinfo->oi_oa = *saved_oa;
1519 } else if (page_count > pages_per_brw) {
1520 /* save a copy of oa (brw will clobber it) */
1521 OBDO_ALLOC(saved_oa);
1522 if (saved_oa == NULL)
1523 GOTO(out, rc = -ENOMEM);
1524 *saved_oa = *oinfo->oi_oa;
1527 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1528 pages_per_brw, ppga, oinfo->oi_capa);
1533 page_count -= pages_per_brw;
1534 ppga += pages_per_brw;
1538 osc_release_ppga(orig, page_count_orig);
1540 if (saved_oa != NULL)
1541 OBDO_FREE(saved_oa);
1546 static int osc_brw_async(int cmd, struct obd_export *exp,
1547 struct obd_info *oinfo, obd_count page_count,
1548 struct brw_page *pga, struct obd_trans_info *oti,
1549 struct ptlrpc_request_set *set)
1551 struct brw_page **ppga, **orig;
1552 struct client_obd *cli = &exp->exp_obd->u.cli;
1553 int page_count_orig;
1557 if (cmd & OBD_BRW_CHECK) {
1558 struct obd_import *imp = class_exp2cliimp(exp);
1559 /* The caller just wants to know if there's a chance that this
1560 * I/O can succeed */
1562 if (imp == NULL || imp->imp_invalid)
1567 orig = ppga = osc_build_ppga(pga, page_count);
1570 page_count_orig = page_count;
1572 sort_brw_pages(ppga, page_count);
1573 while (page_count) {
1574 struct brw_page **copy;
1575 obd_count pages_per_brw;
1577 pages_per_brw = min_t(obd_count, page_count,
1578 cli->cl_max_pages_per_rpc);
1580 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1582 /* use ppga only if single RPC is going to fly */
1583 if (pages_per_brw != page_count_orig || ppga != orig) {
1584 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1586 GOTO(out, rc = -ENOMEM);
1587 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1591 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1592 pages_per_brw, copy, set, oinfo->oi_capa);
1596 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1600 /* we passed it to async_internal() which is
1601 * now responsible for releasing memory */
1605 page_count -= pages_per_brw;
1606 ppga += pages_per_brw;
1610 osc_release_ppga(orig, page_count_orig);
1614 static void osc_check_rpcs(struct client_obd *cli);
1616 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1617 * the dirty accounting. Writeback completes or truncate happens before
1618 * writing starts. Must be called with the loi lock held. */
1619 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1622 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1626 /* This maintains the lists of pending pages to read/write for a given object
1627 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1628 * to quickly find objects that are ready to send an RPC. */
1629 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1635 if (lop->lop_num_pending == 0)
1638 /* if we have an invalid import we want to drain the queued pages
1639 * by forcing them through rpcs that immediately fail and complete
1640 * the pages. recovery relies on this to empty the queued pages
1641 * before canceling the locks and evicting down the llite pages */
1642 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1645 /* stream rpcs in queue order as long as as there is an urgent page
1646 * queued. this is our cheap solution for good batching in the case
1647 * where writepage marks some random page in the middle of the file
1648 * as urgent because of, say, memory pressure */
1649 if (!list_empty(&lop->lop_urgent)) {
1650 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1653 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1654 optimal = cli->cl_max_pages_per_rpc;
1655 if (cmd & OBD_BRW_WRITE) {
1656 /* trigger a write rpc stream as long as there are dirtiers
1657 * waiting for space. as they're waiting, they're not going to
1658 * create more pages to coallesce with what's waiting.. */
1659 if (!list_empty(&cli->cl_cache_waiters)) {
1660 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1663 /* +16 to avoid triggering rpcs that would want to include pages
1664 * that are being queued but which can't be made ready until
1665 * the queuer finishes with the page. this is a wart for
1666 * llite::commit_write() */
1669 if (lop->lop_num_pending >= optimal)
1675 static void on_list(struct list_head *item, struct list_head *list,
1678 if (list_empty(item) && should_be_on)
1679 list_add_tail(item, list);
1680 else if (!list_empty(item) && !should_be_on)
1681 list_del_init(item);
1684 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1685 * can find pages to build into rpcs quickly */
1686 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1688 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1689 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1690 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1692 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1693 loi->loi_write_lop.lop_num_pending);
1695 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1696 loi->loi_read_lop.lop_num_pending);
1699 static void lop_update_pending(struct client_obd *cli,
1700 struct loi_oap_pages *lop, int cmd, int delta)
1702 lop->lop_num_pending += delta;
1703 if (cmd & OBD_BRW_WRITE)
1704 cli->cl_pending_w_pages += delta;
1706 cli->cl_pending_r_pages += delta;
1709 /* this is called when a sync waiter receives an interruption. Its job is to
1710 * get the caller woken as soon as possible. If its page hasn't been put in an
1711 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1712 * desiring interruption which will forcefully complete the rpc once the rpc
1714 static void osc_occ_interrupted(struct oig_callback_context *occ)
1716 struct osc_async_page *oap;
1717 struct loi_oap_pages *lop;
1718 struct lov_oinfo *loi;
1721 /* XXX member_of() */
1722 oap = list_entry(occ, struct osc_async_page, oap_occ);
1724 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1726 oap->oap_interrupted = 1;
1728 /* ok, it's been put in an rpc. only one oap gets a request reference */
1729 if (oap->oap_request != NULL) {
1730 ptlrpc_mark_interrupted(oap->oap_request);
1731 ptlrpcd_wake(oap->oap_request);
1735 /* we don't get interruption callbacks until osc_trigger_group_io()
1736 * has been called and put the sync oaps in the pending/urgent lists.*/
1737 if (!list_empty(&oap->oap_pending_item)) {
1738 list_del_init(&oap->oap_pending_item);
1739 list_del_init(&oap->oap_urgent_item);
1742 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1743 &loi->loi_write_lop : &loi->loi_read_lop;
1744 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1745 loi_list_maint(oap->oap_cli, oap->oap_loi);
1747 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1748 oap->oap_oig = NULL;
1752 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1755 /* this is trying to propogate async writeback errors back up to the
1756 * application. As an async write fails we record the error code for later if
1757 * the app does an fsync. As long as errors persist we force future rpcs to be
1758 * sync so that the app can get a sync error and break the cycle of queueing
1759 * pages for which writeback will fail. */
1760 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1767 ar->ar_force_sync = 1;
1768 ar->ar_min_xid = ptlrpc_sample_next_xid();
1773 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1774 ar->ar_force_sync = 0;
1777 static void osc_oap_to_pending(struct osc_async_page *oap)
1779 struct loi_oap_pages *lop;
1781 if (oap->oap_cmd & OBD_BRW_WRITE)
1782 lop = &oap->oap_loi->loi_write_lop;
1784 lop = &oap->oap_loi->loi_read_lop;
1786 if (oap->oap_async_flags & ASYNC_URGENT)
1787 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1788 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1789 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1792 /* this must be called holding the loi list lock to give coverage to exit_cache,
1793 * async_flag maintenance, and oap_request */
1794 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1795 struct osc_async_page *oap, int sent, int rc)
1798 oap->oap_async_flags = 0;
1799 oap->oap_interrupted = 0;
1801 if (oap->oap_cmd & OBD_BRW_WRITE) {
1802 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1803 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1806 if (oap->oap_request != NULL) {
1807 ptlrpc_req_finished(oap->oap_request);
1808 oap->oap_request = NULL;
1811 if (rc == 0 && oa != NULL) {
1812 if (oa->o_valid & OBD_MD_FLBLOCKS)
1813 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1814 if (oa->o_valid & OBD_MD_FLMTIME)
1815 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1816 if (oa->o_valid & OBD_MD_FLATIME)
1817 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1818 if (oa->o_valid & OBD_MD_FLCTIME)
1819 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1823 osc_exit_cache(cli, oap, sent);
1824 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1825 oap->oap_oig = NULL;
1830 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1831 oap->oap_cmd, oa, rc);
1833 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1834 * I/O on the page could start, but OSC calls it under lock
1835 * and thus we can add oap back to pending safely */
1837 /* upper layer wants to leave the page on pending queue */
1838 osc_oap_to_pending(oap);
1840 osc_exit_cache(cli, oap, sent);
1844 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1846 struct osc_async_page *oap, *tmp;
1847 struct osc_brw_async_args *aa = data;
1848 struct client_obd *cli;
1851 rc = osc_brw_fini_request(req, rc);
1852 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1853 if (rc == -EAGAIN) {
1854 rc = osc_brw_redo_request(req, aa);
1862 client_obd_list_lock(&cli->cl_loi_list_lock);
1864 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1865 * is called so we know whether to go to sync BRWs or wait for more
1866 * RPCs to complete */
1867 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1868 cli->cl_w_in_flight--;
1870 cli->cl_r_in_flight--;
1872 /* the caller may re-use the oap after the completion call so
1873 * we need to clean it up a little */
1874 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1875 list_del_init(&oap->oap_rpc_item);
1876 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1879 osc_wake_cache_waiters(cli);
1880 osc_check_rpcs(cli);
1882 client_obd_list_unlock(&cli->cl_loi_list_lock);
1884 OBDO_FREE(aa->aa_oa);
1887 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1891 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1892 struct list_head *rpc_list,
1893 int page_count, int cmd)
1895 struct ptlrpc_request *req;
1896 struct brw_page **pga = NULL;
1897 struct osc_brw_async_args *aa;
1898 struct obdo *oa = NULL;
1899 struct obd_async_page_ops *ops = NULL;
1900 void *caller_data = NULL;
1901 struct obd_capa *ocapa;
1902 struct osc_async_page *oap;
1906 LASSERT(!list_empty(rpc_list));
1908 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1910 RETURN(ERR_PTR(-ENOMEM));
1914 GOTO(out, req = ERR_PTR(-ENOMEM));
1917 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1919 ops = oap->oap_caller_ops;
1920 caller_data = oap->oap_caller_data;
1922 pga[i] = &oap->oap_brw_page;
1923 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1924 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1925 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1929 /* always get the data for the obdo for the rpc */
1930 LASSERT(ops != NULL);
1931 ops->ap_fill_obdo(caller_data, cmd, oa);
1932 ocapa = ops->ap_lookup_capa(caller_data, cmd);
1934 sort_brw_pages(pga, page_count);
1935 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1939 CERROR("prep_req failed: %d\n", rc);
1940 GOTO(out, req = ERR_PTR(rc));
1943 /* Need to update the timestamps after the request is built in case
1944 * we race with setattr (locally or in queue at OST). If OST gets
1945 * later setattr before earlier BRW (as determined by the request xid),
1946 * the OST will not use BRW timestamps. Sadly, there is no obvious
1947 * way to do this in a single call. bug 10150 */
1948 ops->ap_update_obdo(caller_data, cmd, oa,
1949 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1951 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1952 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1953 INIT_LIST_HEAD(&aa->aa_oaps);
1954 list_splice(rpc_list, &aa->aa_oaps);
1955 INIT_LIST_HEAD(rpc_list);
1962 OBD_FREE(pga, sizeof(*pga) * page_count);
1967 /* the loi lock is held across this function but it's allowed to release
1968 * and reacquire it during its work */
1969 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1970 int cmd, struct loi_oap_pages *lop)
1972 struct ptlrpc_request *req;
1973 obd_count page_count = 0;
1974 struct osc_async_page *oap = NULL, *tmp;
1975 struct osc_brw_async_args *aa;
1976 struct obd_async_page_ops *ops;
1977 CFS_LIST_HEAD(rpc_list);
1978 unsigned int ending_offset;
1979 unsigned starting_offset = 0;
1982 /* first we find the pages we're allowed to work with */
1983 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
1985 ops = oap->oap_caller_ops;
1987 LASSERT(oap->oap_magic == OAP_MAGIC);
1989 /* in llite being 'ready' equates to the page being locked
1990 * until completion unlocks it. commit_write submits a page
1991 * as not ready because its unlock will happen unconditionally
1992 * as the call returns. if we race with commit_write giving
1993 * us that page we dont' want to create a hole in the page
1994 * stream, so we stop and leave the rpc to be fired by
1995 * another dirtier or kupdated interval (the not ready page
1996 * will still be on the dirty list). we could call in
1997 * at the end of ll_file_write to process the queue again. */
1998 if (!(oap->oap_async_flags & ASYNC_READY)) {
1999 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2001 CDEBUG(D_INODE, "oap %p page %p returned %d "
2002 "instead of ready\n", oap,
2006 /* llite is telling us that the page is still
2007 * in commit_write and that we should try
2008 * and put it in an rpc again later. we
2009 * break out of the loop so we don't create
2010 * a hole in the sequence of pages in the rpc
2015 /* the io isn't needed.. tell the checks
2016 * below to complete the rpc with EINTR */
2017 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2018 oap->oap_count = -EINTR;
2021 oap->oap_async_flags |= ASYNC_READY;
2024 LASSERTF(0, "oap %p page %p returned %d "
2025 "from make_ready\n", oap,
2033 * Page submitted for IO has to be locked. Either by
2034 * ->ap_make_ready() or by higher layers.
2036 * XXX nikita: this assertion should be adjusted when lustre
2037 * starts using PG_writeback for pages being written out.
2039 #if defined(__KERNEL__) && defined(__LINUX__)
2040 LASSERT(PageLocked(oap->oap_page));
2042 /* If there is a gap at the start of this page, it can't merge
2043 * with any previous page, so we'll hand the network a
2044 * "fragmented" page array that it can't transfer in 1 RDMA */
2045 if (page_count != 0 && oap->oap_page_off != 0)
2048 /* take the page out of our book-keeping */
2049 list_del_init(&oap->oap_pending_item);
2050 lop_update_pending(cli, lop, cmd, -1);
2051 list_del_init(&oap->oap_urgent_item);
2053 if (page_count == 0)
2054 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2055 (PTLRPC_MAX_BRW_SIZE - 1);
2057 /* ask the caller for the size of the io as the rpc leaves. */
2058 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2060 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2061 if (oap->oap_count <= 0) {
2062 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2064 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2068 /* now put the page back in our accounting */
2069 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2070 if (++page_count >= cli->cl_max_pages_per_rpc)
2073 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2074 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2075 * have the same alignment as the initial writes that allocated
2076 * extents on the server. */
2077 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2078 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2079 if (ending_offset == 0)
2082 /* If there is a gap at the end of this page, it can't merge
2083 * with any subsequent pages, so we'll hand the network a
2084 * "fragmented" page array that it can't transfer in 1 RDMA */
2085 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2089 osc_wake_cache_waiters(cli);
2091 if (page_count == 0)
2094 loi_list_maint(cli, loi);
2096 client_obd_list_unlock(&cli->cl_loi_list_lock);
2098 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2100 /* this should happen rarely and is pretty bad, it makes the
2101 * pending list not follow the dirty order */
2102 client_obd_list_lock(&cli->cl_loi_list_lock);
2103 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2104 list_del_init(&oap->oap_rpc_item);
2106 /* queued sync pages can be torn down while the pages
2107 * were between the pending list and the rpc */
2108 if (oap->oap_interrupted) {
2109 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2110 osc_ap_completion(cli, NULL, oap, 0,
2114 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2116 loi_list_maint(cli, loi);
2117 RETURN(PTR_ERR(req));
2120 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2122 if (cmd == OBD_BRW_READ) {
2123 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2124 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2125 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2126 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2127 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2129 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2130 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2131 cli->cl_w_in_flight);
2132 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2133 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2134 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2137 client_obd_list_lock(&cli->cl_loi_list_lock);
2139 if (cmd == OBD_BRW_READ)
2140 cli->cl_r_in_flight++;
2142 cli->cl_w_in_flight++;
2144 /* queued sync pages can be torn down while the pages
2145 * were between the pending list and the rpc */
2147 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2148 /* only one oap gets a request reference */
2151 if (oap->oap_interrupted && !req->rq_intr) {
2152 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2154 ptlrpc_mark_interrupted(req);
2158 tmp->oap_request = ptlrpc_request_addref(req);
2160 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2161 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2163 req->rq_interpret_reply = brw_interpret_oap;
2164 ptlrpcd_add_req(req);
2168 #define LOI_DEBUG(LOI, STR, args...) \
2169 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2170 !list_empty(&(LOI)->loi_cli_item), \
2171 (LOI)->loi_write_lop.lop_num_pending, \
2172 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2173 (LOI)->loi_read_lop.lop_num_pending, \
2174 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2177 /* This is called by osc_check_rpcs() to find which objects have pages that
2178 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2179 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2182 /* first return all objects which we already know to have
2183 * pages ready to be stuffed into rpcs */
2184 if (!list_empty(&cli->cl_loi_ready_list))
2185 RETURN(list_entry(cli->cl_loi_ready_list.next,
2186 struct lov_oinfo, loi_cli_item));
2188 /* then if we have cache waiters, return all objects with queued
2189 * writes. This is especially important when many small files
2190 * have filled up the cache and not been fired into rpcs because
2191 * they don't pass the nr_pending/object threshhold */
2192 if (!list_empty(&cli->cl_cache_waiters) &&
2193 !list_empty(&cli->cl_loi_write_list))
2194 RETURN(list_entry(cli->cl_loi_write_list.next,
2195 struct lov_oinfo, loi_write_item));
2197 /* then return all queued objects when we have an invalid import
2198 * so that they get flushed */
2199 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2200 if (!list_empty(&cli->cl_loi_write_list))
2201 RETURN(list_entry(cli->cl_loi_write_list.next,
2202 struct lov_oinfo, loi_write_item));
2203 if (!list_empty(&cli->cl_loi_read_list))
2204 RETURN(list_entry(cli->cl_loi_read_list.next,
2205 struct lov_oinfo, loi_read_item));
2210 /* called with the loi list lock held */
2211 static void osc_check_rpcs(struct client_obd *cli)
2213 struct lov_oinfo *loi;
2214 int rc = 0, race_counter = 0;
2217 while ((loi = osc_next_loi(cli)) != NULL) {
2218 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2220 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2223 /* attempt some read/write balancing by alternating between
2224 * reads and writes in an object. The makes_rpc checks here
2225 * would be redundant if we were getting read/write work items
2226 * instead of objects. we don't want send_oap_rpc to drain a
2227 * partial read pending queue when we're given this object to
2228 * do io on writes while there are cache waiters */
2229 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2230 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2231 &loi->loi_write_lop);
2239 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2240 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2241 &loi->loi_read_lop);
2250 /* attempt some inter-object balancing by issueing rpcs
2251 * for each object in turn */
2252 if (!list_empty(&loi->loi_cli_item))
2253 list_del_init(&loi->loi_cli_item);
2254 if (!list_empty(&loi->loi_write_item))
2255 list_del_init(&loi->loi_write_item);
2256 if (!list_empty(&loi->loi_read_item))
2257 list_del_init(&loi->loi_read_item);
2259 loi_list_maint(cli, loi);
2261 /* send_oap_rpc fails with 0 when make_ready tells it to
2262 * back off. llite's make_ready does this when it tries
2263 * to lock a page queued for write that is already locked.
2264 * we want to try sending rpcs from many objects, but we
2265 * don't want to spin failing with 0. */
2266 if (race_counter == 10)
2272 /* we're trying to queue a page in the osc so we're subject to the
2273 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2274 * If the osc's queued pages are already at that limit, then we want to sleep
2275 * until there is space in the osc's queue for us. We also may be waiting for
2276 * write credits from the OST if there are RPCs in flight that may return some
2277 * before we fall back to sync writes.
2279 * We need this know our allocation was granted in the presence of signals */
2280 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2284 client_obd_list_lock(&cli->cl_loi_list_lock);
2285 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2286 client_obd_list_unlock(&cli->cl_loi_list_lock);
2290 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2291 * grant or cache space. */
2292 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2293 struct osc_async_page *oap)
2295 struct osc_cache_waiter ocw;
2296 struct l_wait_info lwi = { 0 };
2300 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2301 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2302 cli->cl_dirty_max, obd_max_dirty_pages,
2303 cli->cl_lost_grant, cli->cl_avail_grant);
2305 /* force the caller to try sync io. this can jump the list
2306 * of queued writes and create a discontiguous rpc stream */
2307 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2308 loi->loi_ar.ar_force_sync)
2311 /* Hopefully normal case - cache space and write credits available */
2312 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2313 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2314 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2315 /* account for ourselves */
2316 osc_consume_write_grant(cli, &oap->oap_brw_page);
2320 /* Make sure that there are write rpcs in flight to wait for. This
2321 * is a little silly as this object may not have any pending but
2322 * other objects sure might. */
2323 if (cli->cl_w_in_flight) {
2324 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2325 cfs_waitq_init(&ocw.ocw_waitq);
2329 loi_list_maint(cli, loi);
2330 osc_check_rpcs(cli);
2331 client_obd_list_unlock(&cli->cl_loi_list_lock);
2333 CDEBUG(D_CACHE, "sleeping for cache space\n");
2334 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2336 client_obd_list_lock(&cli->cl_loi_list_lock);
2337 if (!list_empty(&ocw.ocw_entry)) {
2338 list_del(&ocw.ocw_entry);
2347 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2348 struct lov_oinfo *loi, cfs_page_t *page,
2349 obd_off offset, struct obd_async_page_ops *ops,
2350 void *data, void **res)
2352 struct osc_async_page *oap;
2356 return size_round(sizeof(*oap));
2359 oap->oap_magic = OAP_MAGIC;
2360 oap->oap_cli = &exp->exp_obd->u.cli;
2363 oap->oap_caller_ops = ops;
2364 oap->oap_caller_data = data;
2366 oap->oap_page = page;
2367 oap->oap_obj_off = offset;
2369 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2370 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2371 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2373 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2375 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2379 struct osc_async_page *oap_from_cookie(void *cookie)
2381 struct osc_async_page *oap = cookie;
2382 if (oap->oap_magic != OAP_MAGIC)
2383 return ERR_PTR(-EINVAL);
2387 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2388 struct lov_oinfo *loi, void *cookie,
2389 int cmd, obd_off off, int count,
2390 obd_flag brw_flags, enum async_flags async_flags)
2392 struct client_obd *cli = &exp->exp_obd->u.cli;
2393 struct osc_async_page *oap;
2397 oap = oap_from_cookie(cookie);
2399 RETURN(PTR_ERR(oap));
2401 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2404 if (!list_empty(&oap->oap_pending_item) ||
2405 !list_empty(&oap->oap_urgent_item) ||
2406 !list_empty(&oap->oap_rpc_item))
2409 /* check if the file's owner/group is over quota */
2410 #ifdef HAVE_QUOTA_SUPPORT
2411 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2412 struct obd_async_page_ops *ops;
2419 ops = oap->oap_caller_ops;
2420 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2421 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2432 loi = lsm->lsm_oinfo[0];
2434 client_obd_list_lock(&cli->cl_loi_list_lock);
2437 oap->oap_page_off = off;
2438 oap->oap_count = count;
2439 oap->oap_brw_flags = brw_flags;
2440 oap->oap_async_flags = async_flags;
2442 if (cmd & OBD_BRW_WRITE) {
2443 rc = osc_enter_cache(cli, loi, oap);
2445 client_obd_list_unlock(&cli->cl_loi_list_lock);
2450 osc_oap_to_pending(oap);
2451 loi_list_maint(cli, loi);
2453 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2456 osc_check_rpcs(cli);
2457 client_obd_list_unlock(&cli->cl_loi_list_lock);
2462 /* aka (~was & now & flag), but this is more clear :) */
2463 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2465 static int osc_set_async_flags(struct obd_export *exp,
2466 struct lov_stripe_md *lsm,
2467 struct lov_oinfo *loi, void *cookie,
2468 obd_flag async_flags)
2470 struct client_obd *cli = &exp->exp_obd->u.cli;
2471 struct loi_oap_pages *lop;
2472 struct osc_async_page *oap;
2476 oap = oap_from_cookie(cookie);
2478 RETURN(PTR_ERR(oap));
2481 * bug 7311: OST-side locking is only supported for liblustre for now
2482 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2483 * implementation has to handle case where OST-locked page was picked
2484 * up by, e.g., ->writepage().
2486 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2487 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2490 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2494 loi = lsm->lsm_oinfo[0];
2496 if (oap->oap_cmd & OBD_BRW_WRITE) {
2497 lop = &loi->loi_write_lop;
2499 lop = &loi->loi_read_lop;
2502 client_obd_list_lock(&cli->cl_loi_list_lock);
2504 if (list_empty(&oap->oap_pending_item))
2505 GOTO(out, rc = -EINVAL);
2507 if ((oap->oap_async_flags & async_flags) == async_flags)
2510 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2511 oap->oap_async_flags |= ASYNC_READY;
2513 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2514 if (list_empty(&oap->oap_rpc_item)) {
2515 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2516 loi_list_maint(cli, loi);
2520 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2521 oap->oap_async_flags);
2523 osc_check_rpcs(cli);
2524 client_obd_list_unlock(&cli->cl_loi_list_lock);
2528 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2529 struct lov_oinfo *loi,
2530 struct obd_io_group *oig, void *cookie,
2531 int cmd, obd_off off, int count,
2533 obd_flag async_flags)
2535 struct client_obd *cli = &exp->exp_obd->u.cli;
2536 struct osc_async_page *oap;
2537 struct loi_oap_pages *lop;
2541 oap = oap_from_cookie(cookie);
2543 RETURN(PTR_ERR(oap));
2545 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2548 if (!list_empty(&oap->oap_pending_item) ||
2549 !list_empty(&oap->oap_urgent_item) ||
2550 !list_empty(&oap->oap_rpc_item))
2554 loi = lsm->lsm_oinfo[0];
2556 client_obd_list_lock(&cli->cl_loi_list_lock);
2559 oap->oap_page_off = off;
2560 oap->oap_count = count;
2561 oap->oap_brw_flags = brw_flags;
2562 oap->oap_async_flags = async_flags;
2564 if (cmd & OBD_BRW_WRITE)
2565 lop = &loi->loi_write_lop;
2567 lop = &loi->loi_read_lop;
2569 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2570 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2572 rc = oig_add_one(oig, &oap->oap_occ);
2575 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2576 oap, oap->oap_page, rc);
2578 client_obd_list_unlock(&cli->cl_loi_list_lock);
2583 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2584 struct loi_oap_pages *lop, int cmd)
2586 struct list_head *pos, *tmp;
2587 struct osc_async_page *oap;
2589 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2590 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2591 list_del(&oap->oap_pending_item);
2592 osc_oap_to_pending(oap);
2594 loi_list_maint(cli, loi);
2597 static int osc_trigger_group_io(struct obd_export *exp,
2598 struct lov_stripe_md *lsm,
2599 struct lov_oinfo *loi,
2600 struct obd_io_group *oig)
2602 struct client_obd *cli = &exp->exp_obd->u.cli;
2606 loi = lsm->lsm_oinfo[0];
2608 client_obd_list_lock(&cli->cl_loi_list_lock);
2610 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2611 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2613 osc_check_rpcs(cli);
2614 client_obd_list_unlock(&cli->cl_loi_list_lock);
2619 static int osc_teardown_async_page(struct obd_export *exp,
2620 struct lov_stripe_md *lsm,
2621 struct lov_oinfo *loi, void *cookie)
2623 struct client_obd *cli = &exp->exp_obd->u.cli;
2624 struct loi_oap_pages *lop;
2625 struct osc_async_page *oap;
2629 oap = oap_from_cookie(cookie);
2631 RETURN(PTR_ERR(oap));
2634 loi = lsm->lsm_oinfo[0];
2636 if (oap->oap_cmd & OBD_BRW_WRITE) {
2637 lop = &loi->loi_write_lop;
2639 lop = &loi->loi_read_lop;
2642 client_obd_list_lock(&cli->cl_loi_list_lock);
2644 if (!list_empty(&oap->oap_rpc_item))
2645 GOTO(out, rc = -EBUSY);
2647 osc_exit_cache(cli, oap, 0);
2648 osc_wake_cache_waiters(cli);
2650 if (!list_empty(&oap->oap_urgent_item)) {
2651 list_del_init(&oap->oap_urgent_item);
2652 oap->oap_async_flags &= ~ASYNC_URGENT;
2654 if (!list_empty(&oap->oap_pending_item)) {
2655 list_del_init(&oap->oap_pending_item);
2656 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2658 loi_list_maint(cli, loi);
2660 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2662 client_obd_list_unlock(&cli->cl_loi_list_lock);
2666 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2669 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2672 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2675 lock_res_and_lock(lock);
2678 /* Liang XXX: Darwin and Winnt checking should be added */
2679 if (lock->l_ast_data && lock->l_ast_data != data) {
2680 struct inode *new_inode = data;
2681 struct inode *old_inode = lock->l_ast_data;
2682 if (!(old_inode->i_state & I_FREEING))
2683 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2684 LASSERTF(old_inode->i_state & I_FREEING,
2685 "Found existing inode %p/%lu/%u state %lu in lock: "
2686 "setting data to %p/%lu/%u\n", old_inode,
2687 old_inode->i_ino, old_inode->i_generation,
2689 new_inode, new_inode->i_ino, new_inode->i_generation);
2693 lock->l_ast_data = data;
2694 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2695 unlock_res_and_lock(lock);
2696 LDLM_LOCK_PUT(lock);
2699 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2700 ldlm_iterator_t replace, void *data)
2702 struct ldlm_res_id res_id = { .name = {0} };
2703 struct obd_device *obd = class_exp2obd(exp);
2705 res_id.name[0] = lsm->lsm_object_id;
2706 res_id.name[2] = lsm->lsm_object_gr;
2708 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2712 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2718 /* The request was created before ldlm_cli_enqueue call. */
2719 if (rc == ELDLM_LOCK_ABORTED) {
2720 struct ldlm_reply *rep;
2722 /* swabbed by ldlm_cli_enqueue() */
2723 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2724 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2726 LASSERT(rep != NULL);
2727 if (rep->lock_policy_res1)
2728 rc = rep->lock_policy_res1;
2732 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2733 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2734 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2735 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2736 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2739 /* Call the update callback. */
2740 rc = oinfo->oi_cb_up(oinfo, rc);
2744 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2745 struct osc_enqueue_args *aa, int rc)
2747 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2748 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2749 struct ldlm_lock *lock;
2751 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2753 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2755 /* Complete obtaining the lock procedure. */
2756 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2758 &aa->oa_ei->ei_flags,
2759 &lsm->lsm_oinfo[0]->loi_lvb,
2760 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2761 lustre_swab_ost_lvb,
2762 aa->oa_oi->oi_lockh, rc);
2764 /* Complete osc stuff. */
2765 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2767 /* Release the lock for async request. */
2768 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2769 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2771 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2772 aa->oa_oi->oi_lockh, req, aa);
2773 LDLM_LOCK_PUT(lock);
2777 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2778 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2779 * other synchronous requests, however keeping some locks and trying to obtain
2780 * others may take a considerable amount of time in a case of ost failure; and
2781 * when other sync requests do not get released lock from a client, the client
2782 * is excluded from the cluster -- such scenarious make the life difficult, so
2783 * release locks just after they are obtained. */
2784 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2785 struct obd_enqueue_info *einfo)
2787 struct ldlm_res_id res_id = { .name = {0} };
2788 struct obd_device *obd = exp->exp_obd;
2789 struct ldlm_reply *rep;
2790 struct ptlrpc_request *req = NULL;
2791 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2795 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2796 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2798 /* Filesystem lock extents are extended to page boundaries so that
2799 * dealing with the page cache is a little smoother. */
2800 oinfo->oi_policy.l_extent.start -=
2801 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2802 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2804 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2807 /* Next, search for already existing extent locks that will cover us */
2808 rc = ldlm_lock_match(obd->obd_namespace,
2809 einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2810 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2813 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2816 /* I would like to be able to ASSERT here that rss <=
2817 * kms, but I can't, for reasons which are explained in
2821 /* We already have a lock, and it's referenced */
2822 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2824 /* For async requests, decref the lock. */
2825 if (einfo->ei_rqset)
2826 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2831 /* If we're trying to read, we also search for an existing PW lock. The
2832 * VFS and page cache already protect us locally, so lots of readers/
2833 * writers can share a single PW lock.
2835 * There are problems with conversion deadlocks, so instead of
2836 * converting a read lock to a write lock, we'll just enqueue a new
2839 * At some point we should cancel the read lock instead of making them
2840 * send us a blocking callback, but there are problems with canceling
2841 * locks out from other users right now, too. */
2843 if (einfo->ei_mode == LCK_PR) {
2844 rc = ldlm_lock_match(obd->obd_namespace,
2845 einfo->ei_flags | LDLM_FL_LVB_READY,
2846 &res_id, einfo->ei_type, &oinfo->oi_policy,
2847 LCK_PW, oinfo->oi_lockh);
2849 /* FIXME: This is not incredibly elegant, but it might
2850 * be more elegant than adding another parameter to
2851 * lock_match. I want a second opinion. */
2852 /* addref the lock only if not async requests. */
2853 if (!einfo->ei_rqset)
2854 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2855 osc_set_data_with_check(oinfo->oi_lockh,
2858 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2859 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2867 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2868 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
2870 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2871 LDLM_ENQUEUE, 2, size, NULL);
2875 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2876 size[DLM_REPLY_REC_OFF] =
2877 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2878 ptlrpc_req_set_repsize(req, 3, size);
2881 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2882 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2884 rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type,
2885 &oinfo->oi_policy, einfo->ei_mode,
2886 &einfo->ei_flags, einfo->ei_cb_bl,
2887 einfo->ei_cb_cp, einfo->ei_cb_gl,
2889 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2890 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2891 lustre_swab_ost_lvb, oinfo->oi_lockh,
2892 einfo->ei_rqset ? 1 : 0);
2893 if (einfo->ei_rqset) {
2895 struct osc_enqueue_args *aa;
2896 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2897 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2902 req->rq_interpret_reply = osc_enqueue_interpret;
2903 ptlrpc_set_add_req(einfo->ei_rqset, req);
2904 } else if (intent) {
2905 ptlrpc_req_finished(req);
2910 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2912 ptlrpc_req_finished(req);
2917 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2918 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2919 int *flags, void *data, struct lustre_handle *lockh)
2921 struct ldlm_res_id res_id = { .name = {0} };
2922 struct obd_device *obd = exp->exp_obd;
2924 int lflags = *flags;
2927 res_id.name[0] = lsm->lsm_object_id;
2928 res_id.name[2] = lsm->lsm_object_gr;
2930 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2932 /* Filesystem lock extents are extended to page boundaries so that
2933 * dealing with the page cache is a little smoother */
2934 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2935 policy->l_extent.end |= ~CFS_PAGE_MASK;
2937 /* Next, search for already existing extent locks that will cover us */
2938 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2939 &res_id, type, policy, mode, lockh);
2941 //if (!(*flags & LDLM_FL_TEST_LOCK))
2942 osc_set_data_with_check(lockh, data, lflags);
2945 /* If we're trying to read, we also search for an existing PW lock. The
2946 * VFS and page cache already protect us locally, so lots of readers/
2947 * writers can share a single PW lock. */
2948 if (mode == LCK_PR) {
2949 rc = ldlm_lock_match(obd->obd_namespace,
2950 lflags | LDLM_FL_LVB_READY, &res_id,
2951 type, policy, LCK_PW, lockh);
2952 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2953 /* FIXME: This is not incredibly elegant, but it might
2954 * be more elegant than adding another parameter to
2955 * lock_match. I want a second opinion. */
2956 osc_set_data_with_check(lockh, data, lflags);
2957 ldlm_lock_addref(lockh, LCK_PR);
2958 ldlm_lock_decref(lockh, LCK_PW);
2964 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2965 __u32 mode, struct lustre_handle *lockh)
2969 if (unlikely(mode == LCK_GROUP))
2970 ldlm_lock_decref_and_cancel(lockh, mode);
2972 ldlm_lock_decref(lockh, mode);
2977 static int osc_cancel_unused(struct obd_export *exp,
2978 struct lov_stripe_md *lsm, int flags,
2981 struct obd_device *obd = class_exp2obd(exp);
2982 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2985 res_id.name[0] = lsm->lsm_object_id;
2986 res_id.name[2] = lsm->lsm_object_gr;
2990 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags,
2994 static int osc_join_lru(struct obd_export *exp,
2995 struct lov_stripe_md *lsm, int join)
2997 struct obd_device *obd = class_exp2obd(exp);
2998 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3001 res_id.name[0] = lsm->lsm_object_id;
3002 res_id.name[2] = lsm->lsm_object_gr;
3006 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3009 static int osc_statfs_interpret(struct ptlrpc_request *req,
3010 struct osc_async_args *aa, int rc)
3012 struct obd_statfs *msfs;
3018 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3019 lustre_swab_obd_statfs);
3021 CERROR("Can't unpack obd_statfs\n");
3022 GOTO(out, rc = -EPROTO);
3025 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3027 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3031 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3032 __u64 max_age, struct ptlrpc_request_set *rqset)
3034 struct ptlrpc_request *req;
3035 struct osc_async_args *aa;
3036 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3039 /* We could possibly pass max_age in the request (as an absolute
3040 * timestamp or a "seconds.usec ago") so the target can avoid doing
3041 * extra calls into the filesystem if that isn't necessary (e.g.
3042 * during mount that would help a bit). Having relative timestamps
3043 * is not so great if request processing is slow, while absolute
3044 * timestamps are not ideal because they need time synchronization. */
3045 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3046 OST_STATFS, 1, NULL, NULL);
3050 ptlrpc_req_set_repsize(req, 2, size);
3051 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3053 req->rq_interpret_reply = osc_statfs_interpret;
3054 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3055 aa = (struct osc_async_args *)&req->rq_async_args;
3058 ptlrpc_set_add_req(rqset, req);
3062 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3065 struct obd_statfs *msfs;
3066 struct ptlrpc_request *req;
3067 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3070 /* We could possibly pass max_age in the request (as an absolute
3071 * timestamp or a "seconds.usec ago") so the target can avoid doing
3072 * extra calls into the filesystem if that isn't necessary (e.g.
3073 * during mount that would help a bit). Having relative timestamps
3074 * is not so great if request processing is slow, while absolute
3075 * timestamps are not ideal because they need time synchronization. */
3076 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3077 OST_STATFS, 1, NULL, NULL);
3081 ptlrpc_req_set_repsize(req, 2, size);
3082 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3084 rc = ptlrpc_queue_wait(req);
3088 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3089 lustre_swab_obd_statfs);
3091 CERROR("Can't unpack obd_statfs\n");
3092 GOTO(out, rc = -EPROTO);
3095 memcpy(osfs, msfs, sizeof(*osfs));
3099 ptlrpc_req_finished(req);
3103 /* Retrieve object striping information.
3105 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3106 * the maximum number of OST indices which will fit in the user buffer.
3107 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3109 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3111 struct lov_user_md lum, *lumk;
3112 int rc = 0, lum_size;
3118 if (copy_from_user(&lum, lump, sizeof(lum)))
3121 if (lum.lmm_magic != LOV_USER_MAGIC)
3124 if (lum.lmm_stripe_count > 0) {
3125 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3126 OBD_ALLOC(lumk, lum_size);
3130 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3131 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3133 lum_size = sizeof(lum);
3137 lumk->lmm_object_id = lsm->lsm_object_id;
3138 lumk->lmm_object_gr = lsm->lsm_object_gr;
3139 lumk->lmm_stripe_count = 1;
3141 if (copy_to_user(lump, lumk, lum_size))
3145 OBD_FREE(lumk, lum_size);
3151 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3152 void *karg, void *uarg)
3154 struct obd_device *obd = exp->exp_obd;
3155 struct obd_ioctl_data *data = karg;
3159 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3162 if (!try_module_get(THIS_MODULE)) {
3163 CERROR("Can't get module. Is it alive?");
3168 case OBD_IOC_LOV_GET_CONFIG: {
3170 struct lov_desc *desc;
3171 struct obd_uuid uuid;
3175 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3176 GOTO(out, err = -EINVAL);
3178 data = (struct obd_ioctl_data *)buf;
3180 if (sizeof(*desc) > data->ioc_inllen1) {
3181 obd_ioctl_freedata(buf, len);
3182 GOTO(out, err = -EINVAL);
3185 if (data->ioc_inllen2 < sizeof(uuid)) {
3186 obd_ioctl_freedata(buf, len);
3187 GOTO(out, err = -EINVAL);
3190 desc = (struct lov_desc *)data->ioc_inlbuf1;
3191 desc->ld_tgt_count = 1;
3192 desc->ld_active_tgt_count = 1;
3193 desc->ld_default_stripe_count = 1;
3194 desc->ld_default_stripe_size = 0;
3195 desc->ld_default_stripe_offset = 0;
3196 desc->ld_pattern = 0;
3197 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3199 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3201 err = copy_to_user((void *)uarg, buf, len);
3204 obd_ioctl_freedata(buf, len);
3207 case LL_IOC_LOV_SETSTRIPE:
3208 err = obd_alloc_memmd(exp, karg);
3212 case LL_IOC_LOV_GETSTRIPE:
3213 err = osc_getstripe(karg, uarg);
3215 case OBD_IOC_CLIENT_RECOVER:
3216 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3221 case IOC_OSC_SET_ACTIVE:
3222 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3225 case OBD_IOC_POLL_QUOTACHECK:
3226 err = lquota_poll_check(quota_interface, exp,
3227 (struct if_quotacheck *)karg);
3230 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3231 cmd, cfs_curproc_comm());
3232 GOTO(out, err = -ENOTTY);
3235 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3238 module_put(THIS_MODULE);
3243 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3244 void *key, __u32 *vallen, void *val)
3247 if (!vallen || !val)
3250 if (keylen > strlen("lock_to_stripe") &&
3251 strcmp(key, "lock_to_stripe") == 0) {
3252 __u32 *stripe = val;
3253 *vallen = sizeof(*stripe);
3256 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3257 struct ptlrpc_request *req;
3259 char *bufs[2] = { NULL, key };
3260 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3262 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3263 OST_GET_INFO, 2, size, bufs);
3267 size[REPLY_REC_OFF] = *vallen;
3268 ptlrpc_req_set_repsize(req, 2, size);
3269 rc = ptlrpc_queue_wait(req);
3273 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3274 lustre_swab_ost_last_id);
3275 if (reply == NULL) {
3276 CERROR("Can't unpack OST last ID\n");
3277 GOTO(out, rc = -EPROTO);
3279 *((obd_id *)val) = *reply;
3281 ptlrpc_req_finished(req);
3287 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3290 struct llog_ctxt *ctxt;
3291 struct obd_import *imp = req->rq_import;
3297 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3300 rc = llog_initiator_connect(ctxt);
3302 CERROR("cannot establish connection for "
3303 "ctxt %p: %d\n", ctxt, rc);
3306 spin_lock(&imp->imp_lock);
3307 imp->imp_server_timeout = 1;
3308 imp->imp_pingable = 1;
3309 spin_unlock(&imp->imp_lock);
3310 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3315 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3316 void *key, obd_count vallen, void *val,
3317 struct ptlrpc_request_set *set)
3319 struct ptlrpc_request *req;
3320 struct obd_device *obd = exp->exp_obd;
3321 struct obd_import *imp = class_exp2cliimp(exp);
3322 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3323 char *bufs[3] = { NULL, key, val };
3326 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3328 if (KEY_IS(KEY_NEXT_ID)) {
3329 if (vallen != sizeof(obd_id))
3331 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3332 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3333 exp->exp_obd->obd_name,
3334 obd->u.cli.cl_oscc.oscc_next_id);
3339 if (KEY_IS("unlinked")) {
3340 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3341 spin_lock(&oscc->oscc_lock);
3342 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3343 spin_unlock(&oscc->oscc_lock);
3347 if (KEY_IS(KEY_INIT_RECOV)) {
3348 if (vallen != sizeof(int))
3350 spin_lock(&imp->imp_lock);
3351 imp->imp_initial_recov = *(int *)val;
3352 spin_unlock(&imp->imp_lock);
3353 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3354 exp->exp_obd->obd_name,
3355 imp->imp_initial_recov);
3359 if (KEY_IS("checksum")) {
3360 if (vallen != sizeof(int))
3362 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3366 if (KEY_IS(KEY_FLUSH_CTX)) {
3367 sptlrpc_import_flush_my_ctx(imp);
3374 /* We pass all other commands directly to OST. Since nobody calls osc
3375 methods directly and everybody is supposed to go through LOV, we
3376 assume lov checked invalid values for us.
3377 The only recognised values so far are evict_by_nid and mds_conn.
3378 Even if something bad goes through, we'd get a -EINVAL from OST
3381 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3386 if (KEY_IS("mds_conn")) {
3387 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3389 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3390 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3391 LASSERT(oscc->oscc_oa.o_gr > 0);
3392 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3395 ptlrpc_req_set_repsize(req, 1, NULL);
3396 ptlrpc_set_add_req(set, req);
3397 ptlrpc_check_set(set);
3403 static struct llog_operations osc_size_repl_logops = {
3404 lop_cancel: llog_obd_repl_cancel
3407 static struct llog_operations osc_mds_ost_orig_logops;
3408 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3409 struct obd_device *tgt, int count,
3410 struct llog_catid *catid, struct obd_uuid *uuid)
3415 spin_lock(&obd->obd_dev_lock);
3416 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3417 osc_mds_ost_orig_logops = llog_lvfs_ops;
3418 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3419 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3420 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3421 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3423 spin_unlock(&obd->obd_dev_lock);
3425 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3426 &catid->lci_logid, &osc_mds_ost_orig_logops);
3428 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3432 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3433 &osc_size_repl_logops);
3435 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3438 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3439 obd->obd_name, tgt->obd_name, count, catid, rc);
3440 CERROR("logid "LPX64":0x%x\n",
3441 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3446 static int osc_llog_finish(struct obd_device *obd, int count)
3448 struct llog_ctxt *ctxt;
3449 int rc = 0, rc2 = 0;
3452 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3454 rc = llog_cleanup(ctxt);
3456 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3458 rc2 = llog_cleanup(ctxt);
3465 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3466 struct obd_uuid *cluuid,
3467 struct obd_connect_data *data)
3469 struct client_obd *cli = &obd->u.cli;
3471 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3474 client_obd_list_lock(&cli->cl_loi_list_lock);
3475 data->ocd_grant = cli->cl_avail_grant ?:
3476 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3477 lost_grant = cli->cl_lost_grant;
3478 cli->cl_lost_grant = 0;
3479 client_obd_list_unlock(&cli->cl_loi_list_lock);
3481 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3482 "cl_lost_grant: %ld\n", data->ocd_grant,
3483 cli->cl_avail_grant, lost_grant);
3484 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3485 " ocd_grant: %d\n", data->ocd_connect_flags,
3486 data->ocd_version, data->ocd_grant);
3492 static int osc_disconnect(struct obd_export *exp)
3494 struct obd_device *obd = class_exp2obd(exp);
3495 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3498 if (obd->u.cli.cl_conn_count == 1)
3499 /* flush any remaining cancel messages out to the target */
3500 llog_sync(ctxt, exp);
3502 rc = client_disconnect_export(exp);
3506 static int osc_import_event(struct obd_device *obd,
3507 struct obd_import *imp,
3508 enum obd_import_event event)
3510 struct client_obd *cli;
3514 LASSERT(imp->imp_obd == obd);
3517 case IMP_EVENT_DISCON: {
3518 /* Only do this on the MDS OSC's */
3519 if (imp->imp_server_timeout) {
3520 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3522 spin_lock(&oscc->oscc_lock);
3523 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3524 spin_unlock(&oscc->oscc_lock);
3527 client_obd_list_lock(&cli->cl_loi_list_lock);
3528 cli->cl_avail_grant = 0;
3529 cli->cl_lost_grant = 0;
3530 client_obd_list_unlock(&cli->cl_loi_list_lock);
3533 case IMP_EVENT_INACTIVE: {
3534 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3537 case IMP_EVENT_INVALIDATE: {
3538 struct ldlm_namespace *ns = obd->obd_namespace;
3542 client_obd_list_lock(&cli->cl_loi_list_lock);
3543 /* all pages go to failing rpcs due to the invalid import */
3544 osc_check_rpcs(cli);
3545 client_obd_list_unlock(&cli->cl_loi_list_lock);
3547 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3551 case IMP_EVENT_ACTIVE: {
3552 /* Only do this on the MDS OSC's */
3553 if (imp->imp_server_timeout) {
3554 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3556 spin_lock(&oscc->oscc_lock);
3557 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3558 spin_unlock(&oscc->oscc_lock);
3560 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3563 case IMP_EVENT_OCD: {
3564 struct obd_connect_data *ocd = &imp->imp_connect_data;
3566 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3567 osc_init_grant(&obd->u.cli, ocd);
3570 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3571 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3573 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3577 CERROR("Unknown import event %d\n", event);
3583 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3589 rc = ptlrpcd_addref();
3593 rc = client_obd_setup(obd, lcfg);
3597 struct lprocfs_static_vars lvars;
3598 struct client_obd *cli = &obd->u.cli;
3600 lprocfs_init_vars(osc, &lvars);
3601 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3602 lproc_osc_attach_seqstat(obd);
3603 ptlrpc_lprocfs_register_obd(obd);
3607 /* We need to allocate a few requests more, because
3608 brw_interpret_oap tries to create new requests before freeing
3609 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3610 reserved, but I afraid that might be too much wasted RAM
3611 in fact, so 2 is just my guess and still should work. */
3612 cli->cl_import->imp_rq_pool =
3613 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3615 ptlrpc_add_rqs_to_pool);
3621 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3627 case OBD_CLEANUP_EARLY: {
3628 struct obd_import *imp;
3629 imp = obd->u.cli.cl_import;
3630 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3631 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3632 ptlrpc_deactivate_import(imp);
3633 spin_lock(&imp->imp_lock);
3634 imp->imp_pingable = 0;
3635 spin_unlock(&imp->imp_lock);
3638 case OBD_CLEANUP_EXPORTS: {
3639 /* If we set up but never connected, the
3640 client import will not have been cleaned. */
3641 if (obd->u.cli.cl_import) {
3642 struct obd_import *imp;
3643 imp = obd->u.cli.cl_import;
3644 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3646 ptlrpc_invalidate_import(imp);
3647 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3648 class_destroy_import(imp);
3649 obd->u.cli.cl_import = NULL;
3653 case OBD_CLEANUP_SELF_EXP:
3654 rc = obd_llog_finish(obd, 0);
3656 CERROR("failed to cleanup llogging subsystems\n");
3658 case OBD_CLEANUP_OBD:
3664 int osc_cleanup(struct obd_device *obd)
3666 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3670 ptlrpc_lprocfs_unregister_obd(obd);
3671 lprocfs_obd_cleanup(obd);
3673 spin_lock(&oscc->oscc_lock);
3674 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3675 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3676 spin_unlock(&oscc->oscc_lock);
3678 /* free memory of osc quota cache */
3679 lquota_cleanup(quota_interface, obd);
3681 rc = client_obd_cleanup(obd);
3687 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3689 struct lustre_cfg *lcfg = buf;
3690 struct lprocfs_static_vars lvars;
3693 lprocfs_init_vars(osc, &lvars);
3695 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3699 struct obd_ops osc_obd_ops = {
3700 .o_owner = THIS_MODULE,
3701 .o_setup = osc_setup,
3702 .o_precleanup = osc_precleanup,
3703 .o_cleanup = osc_cleanup,
3704 .o_add_conn = client_import_add_conn,
3705 .o_del_conn = client_import_del_conn,
3706 .o_connect = client_connect_import,
3707 .o_reconnect = osc_reconnect,
3708 .o_disconnect = osc_disconnect,
3709 .o_statfs = osc_statfs,
3710 .o_statfs_async = osc_statfs_async,
3711 .o_packmd = osc_packmd,
3712 .o_unpackmd = osc_unpackmd,
3713 .o_create = osc_create,
3714 .o_destroy = osc_destroy,
3715 .o_getattr = osc_getattr,
3716 .o_getattr_async = osc_getattr_async,
3717 .o_setattr = osc_setattr,
3718 .o_setattr_async = osc_setattr_async,
3720 .o_brw_async = osc_brw_async,
3721 .o_prep_async_page = osc_prep_async_page,
3722 .o_queue_async_io = osc_queue_async_io,
3723 .o_set_async_flags = osc_set_async_flags,
3724 .o_queue_group_io = osc_queue_group_io,
3725 .o_trigger_group_io = osc_trigger_group_io,
3726 .o_teardown_async_page = osc_teardown_async_page,
3727 .o_punch = osc_punch,
3729 .o_enqueue = osc_enqueue,
3730 .o_match = osc_match,
3731 .o_change_cbdata = osc_change_cbdata,
3732 .o_cancel = osc_cancel,
3733 .o_cancel_unused = osc_cancel_unused,
3734 .o_join_lru = osc_join_lru,
3735 .o_iocontrol = osc_iocontrol,
3736 .o_get_info = osc_get_info,
3737 .o_set_info_async = osc_set_info_async,
3738 .o_import_event = osc_import_event,
3739 .o_llog_init = osc_llog_init,
3740 .o_llog_finish = osc_llog_finish,
3741 .o_process_config = osc_process_config,
3744 int __init osc_init(void)
3746 struct lprocfs_static_vars lvars;
3750 lprocfs_init_vars(osc, &lvars);
3752 request_module("lquota");
3753 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3754 lquota_init(quota_interface);
3755 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3757 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3758 LUSTRE_OSC_NAME, NULL);
3760 if (quota_interface)
3761 PORTAL_SYMBOL_PUT(osc_quota_interface);
3769 static void /*__exit*/ osc_exit(void)
3771 lquota_exit(quota_interface);
3772 if (quota_interface)
3773 PORTAL_SYMBOL_PUT(osc_quota_interface);
3775 class_unregister_type(LUSTRE_OSC_NAME);
3778 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3779 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3780 MODULE_LICENSE("GPL");
3782 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);