1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102 struct lov_mds_md *lmm, int lmm_bytes)
108 if (lmm_bytes < sizeof (*lmm)) {
109 CERROR("lov_mds_md too small: %d, need %d\n",
110 lmm_bytes, (int)sizeof(*lmm));
113 /* XXX LOV_MAGIC etc check? */
115 if (lmm->lmm_object_id == 0) {
116 CERROR("lov_mds_md: zero lmm_object_id\n");
121 lsm_size = lov_stripe_md_size(1);
125 if (*lsmp != NULL && lmm == NULL) {
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 loi_init((*lsmp)->lsm_oinfo);
139 /* XXX zero *lsmp? */
140 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
141 LASSERT((*lsmp)->lsm_object_id);
144 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
149 static int osc_getattr_interpret(struct ptlrpc_request *req,
150 struct osc_async_args *aa, int rc)
152 struct ost_body *body;
158 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
159 lustre_swab_ost_body);
161 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
162 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
164 /* This should really be sent by the OST */
165 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
166 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
168 CERROR("can't unpack ost_body\n");
170 aa->aa_oi->oi_oa->o_valid = 0;
173 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
177 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
178 struct ptlrpc_request_set *set)
180 struct ptlrpc_request *req;
181 struct ost_body *body;
182 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
183 struct osc_async_args *aa;
186 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
187 OST_GETATTR, 2, size,NULL);
191 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
192 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
194 ptlrpc_req_set_repsize(req, 2, size);
195 req->rq_interpret_reply = osc_getattr_interpret;
197 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
198 aa = (struct osc_async_args *)&req->rq_async_args;
201 ptlrpc_set_add_req(set, req);
205 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
207 struct ptlrpc_request *req;
208 struct ost_body *body;
209 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
212 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
213 OST_GETATTR, 2, size, NULL);
217 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
218 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
220 ptlrpc_req_set_repsize(req, 2, size);
222 rc = ptlrpc_queue_wait(req);
224 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
228 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
229 lustre_swab_ost_body);
231 CERROR ("can't unpack ost_body\n");
232 GOTO (out, rc = -EPROTO);
235 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
238 /* This should really be sent by the OST */
239 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
240 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
244 ptlrpc_req_finished(req);
248 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
249 struct obd_trans_info *oti)
251 struct ptlrpc_request *req;
252 struct ost_body *body;
253 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
256 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
257 OST_SETATTR, 2, size, NULL);
261 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
262 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
264 ptlrpc_req_set_repsize(req, 2, size);
266 rc = ptlrpc_queue_wait(req);
270 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
271 lustre_swab_ost_body);
273 GOTO(out, rc = -EPROTO);
275 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
279 ptlrpc_req_finished(req);
283 static int osc_setattr_interpret(struct ptlrpc_request *req,
284 struct osc_async_args *aa, int rc)
286 struct ost_body *body;
292 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
293 lustre_swab_ost_body);
295 CERROR("can't unpack ost_body\n");
296 GOTO(out, rc = -EPROTO);
299 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
301 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
305 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
306 struct obd_trans_info *oti,
307 struct ptlrpc_request_set *rqset)
309 struct ptlrpc_request *req;
310 struct ost_body *body;
311 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
312 struct osc_async_args *aa;
315 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
316 OST_SETATTR, 2, size, NULL);
320 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
322 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
324 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
325 sizeof(*oti->oti_logcookies));
328 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
329 ptlrpc_req_set_repsize(req, 2, size);
330 /* do mds to ost setattr asynchronouly */
332 /* Do not wait for response. */
333 ptlrpcd_add_req(req);
335 req->rq_interpret_reply = osc_setattr_interpret;
337 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
338 aa = (struct osc_async_args *)&req->rq_async_args;
341 ptlrpc_set_add_req(rqset, req);
347 int osc_real_create(struct obd_export *exp, struct obdo *oa,
348 struct lov_stripe_md **ea, struct obd_trans_info *oti)
350 struct ptlrpc_request *req;
351 struct ost_body *body;
352 struct lov_stripe_md *lsm;
353 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
361 rc = obd_alloc_memmd(exp, &lsm);
366 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
367 OST_CREATE, 2, size, NULL);
369 GOTO(out, rc = -ENOMEM);
371 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
372 memcpy(&body->oa, oa, sizeof(body->oa));
374 ptlrpc_req_set_repsize(req, 2, size);
375 if (oa->o_valid & OBD_MD_FLINLINE) {
376 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
377 oa->o_flags == OBD_FL_DELORPHAN);
379 "delorphan from OST integration");
380 /* Don't resend the delorphan req */
381 req->rq_no_resend = req->rq_no_delay = 1;
384 rc = ptlrpc_queue_wait(req);
388 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
389 lustre_swab_ost_body);
391 CERROR ("can't unpack ost_body\n");
392 GOTO (out_req, rc = -EPROTO);
395 memcpy(oa, &body->oa, sizeof(*oa));
397 /* This should really be sent by the OST */
398 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
399 oa->o_valid |= OBD_MD_FLBLKSZ;
401 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
402 * have valid lsm_oinfo data structs, so don't go touching that.
403 * This needs to be fixed in a big way.
405 lsm->lsm_object_id = oa->o_id;
409 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
411 if (oa->o_valid & OBD_MD_FLCOOKIE) {
412 if (!oti->oti_logcookies)
413 oti_alloc_cookies(oti, 1);
414 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
415 sizeof(oti->oti_onecookie));
419 CDEBUG(D_HA, "transno: "LPD64"\n",
420 lustre_msg_get_transno(req->rq_repmsg));
423 ptlrpc_req_finished(req);
426 obd_free_memmd(exp, &lsm);
430 static int osc_punch_interpret(struct ptlrpc_request *req,
431 struct osc_async_args *aa, int rc)
433 struct ost_body *body;
439 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
440 lustre_swab_ost_body);
442 CERROR ("can't unpack ost_body\n");
443 GOTO(out, rc = -EPROTO);
446 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
448 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
452 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
453 struct obd_trans_info *oti,
454 struct ptlrpc_request_set *rqset)
456 struct ptlrpc_request *req;
457 struct osc_async_args *aa;
458 struct ost_body *body;
459 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
467 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
468 OST_PUNCH, 2, size, NULL);
472 /* FIXME bug 249. Also see bug 7198 */
473 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
474 OBD_CONNECT_REQPORTAL)
475 req->rq_request_portal = OST_IO_PORTAL;
477 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
478 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
480 /* overload the size and blocks fields in the oa with start/end */
481 body->oa.o_size = oinfo->oi_policy.l_extent.start;
482 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
483 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
485 ptlrpc_req_set_repsize(req, 2, size);
487 req->rq_interpret_reply = osc_punch_interpret;
488 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
489 aa = (struct osc_async_args *)&req->rq_async_args;
491 ptlrpc_set_add_req(rqset, req);
496 static int osc_sync(struct obd_export *exp, struct obdo *oa,
497 struct lov_stripe_md *md, obd_size start, obd_size end)
499 struct ptlrpc_request *req;
500 struct ost_body *body;
501 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
509 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
510 OST_SYNC, 2, size, NULL);
514 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
515 memcpy(&body->oa, oa, sizeof(*oa));
517 /* overload the size and blocks fields in the oa with start/end */
518 body->oa.o_size = start;
519 body->oa.o_blocks = end;
520 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
522 ptlrpc_req_set_repsize(req, 2, size);
524 rc = ptlrpc_queue_wait(req);
528 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529 lustre_swab_ost_body);
531 CERROR ("can't unpack ost_body\n");
532 GOTO (out, rc = -EPROTO);
535 memcpy(oa, &body->oa, sizeof(*oa));
539 ptlrpc_req_finished(req);
543 /* Destroy requests can be async always on the client, and we don't even really
544 * care about the return code since the client cannot do anything at all about
546 * When the MDS is unlinking a filename, it saves the file objects into a
547 * recovery llog, and these object records are cancelled when the OST reports
548 * they were destroyed and sync'd to disk (i.e. transaction committed).
549 * If the client dies, or the OST is down when the object should be destroyed,
550 * the records are not cancelled, and when the OST reconnects to the MDS next,
551 * it will retrieve the llog unlink logs and then sends the log cancellation
552 * cookies to the MDS after committing destroy transactions. */
553 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
554 struct lov_stripe_md *ea, struct obd_trans_info *oti,
555 struct obd_export *md_export)
557 struct ptlrpc_request *req;
558 struct ost_body *body;
559 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
567 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
568 OST_DESTROY, 2, size, NULL);
572 /* FIXME bug 249. Also see bug 7198 */
573 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
574 OBD_CONNECT_REQPORTAL)
575 req->rq_request_portal = OST_IO_PORTAL;
577 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
579 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
580 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
581 sizeof(*oti->oti_logcookies));
584 memcpy(&body->oa, oa, sizeof(*oa));
585 ptlrpc_req_set_repsize(req, 2, size);
587 ptlrpcd_add_req(req);
591 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
594 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
596 LASSERT(!(oa->o_valid & bits));
599 client_obd_list_lock(&cli->cl_loi_list_lock);
600 oa->o_dirty = cli->cl_dirty;
601 if (cli->cl_dirty > cli->cl_dirty_max) {
602 CERROR("dirty %lu > dirty_max %lu\n",
603 cli->cl_dirty, cli->cl_dirty_max);
605 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
606 CERROR("dirty %d > system dirty_max %d\n",
607 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
609 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
610 CERROR("dirty %lu - dirty_max %lu too big???\n",
611 cli->cl_dirty, cli->cl_dirty_max);
614 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
615 (cli->cl_max_rpcs_in_flight + 1);
616 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
618 oa->o_grant = cli->cl_avail_grant;
619 oa->o_dropped = cli->cl_lost_grant;
620 cli->cl_lost_grant = 0;
621 client_obd_list_unlock(&cli->cl_loi_list_lock);
622 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
623 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
626 /* caller must hold loi_list_lock */
627 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
629 atomic_inc(&obd_dirty_pages);
630 cli->cl_dirty += CFS_PAGE_SIZE;
631 cli->cl_avail_grant -= CFS_PAGE_SIZE;
632 pga->flag |= OBD_BRW_FROM_GRANT;
633 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
634 CFS_PAGE_SIZE, pga, pga->pg);
635 LASSERT(cli->cl_avail_grant >= 0);
638 /* the companion to osc_consume_write_grant, called when a brw has completed.
639 * must be called with the loi lock held. */
640 static void osc_release_write_grant(struct client_obd *cli,
641 struct brw_page *pga, int sent)
643 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
646 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
651 pga->flag &= ~OBD_BRW_FROM_GRANT;
652 atomic_dec(&obd_dirty_pages);
653 cli->cl_dirty -= CFS_PAGE_SIZE;
655 cli->cl_lost_grant += CFS_PAGE_SIZE;
656 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
657 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
658 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
659 /* For short writes we shouldn't count parts of pages that
660 * span a whole block on the OST side, or our accounting goes
661 * wrong. Should match the code in filter_grant_check. */
662 int offset = pga->off & ~CFS_PAGE_MASK;
663 int count = pga->count + (offset & (blocksize - 1));
664 int end = (offset + pga->count) & (blocksize - 1);
666 count += blocksize - end;
668 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
669 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
670 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
671 cli->cl_avail_grant, cli->cl_dirty);
677 static unsigned long rpcs_in_flight(struct client_obd *cli)
679 return cli->cl_r_in_flight + cli->cl_w_in_flight;
682 /* caller must hold loi_list_lock */
683 void osc_wake_cache_waiters(struct client_obd *cli)
685 struct list_head *l, *tmp;
686 struct osc_cache_waiter *ocw;
689 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
690 /* if we can't dirty more, we must wait until some is written */
691 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
692 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
693 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
694 "osc max %ld, sys max %d\n", cli->cl_dirty,
695 cli->cl_dirty_max, obd_max_dirty_pages);
699 /* if still dirty cache but no grant wait for pending RPCs that
700 * may yet return us some grant before doing sync writes */
701 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
702 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
703 cli->cl_w_in_flight);
707 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
708 list_del_init(&ocw->ocw_entry);
709 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
710 /* no more RPCs in flight to return grant, do sync IO */
711 ocw->ocw_rc = -EDQUOT;
712 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
714 osc_consume_write_grant(cli,
715 &ocw->ocw_oap->oap_brw_page);
718 cfs_waitq_signal(&ocw->ocw_waitq);
724 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
726 client_obd_list_lock(&cli->cl_loi_list_lock);
727 cli->cl_avail_grant = ocd->ocd_grant;
728 client_obd_list_unlock(&cli->cl_loi_list_lock);
730 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
731 cli->cl_avail_grant, cli->cl_lost_grant);
732 LASSERT(cli->cl_avail_grant >= 0);
735 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
737 client_obd_list_lock(&cli->cl_loi_list_lock);
738 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
739 cli->cl_avail_grant += body->oa.o_grant;
740 /* waiters are woken in brw_interpret_oap */
741 client_obd_list_unlock(&cli->cl_loi_list_lock);
744 /* We assume that the reason this OSC got a short read is because it read
745 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
746 * via the LOV, and it _knows_ it's reading inside the file, it's just that
747 * this stripe never got written at or beyond this stripe offset yet. */
748 static void handle_short_read(int nob_read, obd_count page_count,
749 struct brw_page **pga)
754 /* skip bytes read OK */
755 while (nob_read > 0) {
756 LASSERT (page_count > 0);
758 if (pga[i]->count > nob_read) {
759 /* EOF inside this page */
760 ptr = cfs_kmap(pga[i]->pg) +
761 (pga[i]->off & ~CFS_PAGE_MASK);
762 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
763 cfs_kunmap(pga[i]->pg);
769 nob_read -= pga[i]->count;
774 /* zero remaining pages */
775 while (page_count-- > 0) {
776 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
777 memset(ptr, 0, pga[i]->count);
778 cfs_kunmap(pga[i]->pg);
783 static int check_write_rcs(struct ptlrpc_request *req,
784 int requested_nob, int niocount,
785 obd_count page_count, struct brw_page **pga)
789 /* return error if any niobuf was in error */
790 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
791 sizeof(*remote_rcs) * niocount, NULL);
792 if (remote_rcs == NULL) {
793 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
796 if (lustre_msg_swabbed(req->rq_repmsg))
797 for (i = 0; i < niocount; i++)
798 __swab32s(&remote_rcs[i]);
800 for (i = 0; i < niocount; i++) {
801 if (remote_rcs[i] < 0)
802 return(remote_rcs[i]);
804 if (remote_rcs[i] != 0) {
805 CERROR("rc[%d] invalid (%d) req %p\n",
806 i, remote_rcs[i], req);
811 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
812 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
813 requested_nob, req->rq_bulk->bd_nob_transferred);
820 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
822 if (p1->flag != p2->flag) {
823 unsigned mask = ~OBD_BRW_FROM_GRANT;
825 /* warn if we try to combine flags that we don't know to be
827 if ((p1->flag & mask) != (p2->flag & mask))
828 CERROR("is it ok to have flags 0x%x and 0x%x in the "
829 "same brw?\n", p1->flag, p2->flag);
833 return (p1->off + p1->count == p2->off);
836 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
837 struct brw_page **pga)
842 LASSERT (pg_count > 0);
843 while (nob > 0 && pg_count > 0) {
844 char *ptr = cfs_kmap(pga[i]->pg);
845 int off = pga[i]->off & ~CFS_PAGE_MASK;
846 int count = pga[i]->count > nob ? nob : pga[i]->count;
848 /* corrupt the data before we compute the checksum, to
849 * simulate an OST->client data error */
850 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
851 memcpy(ptr + off, "bad1", min(4, nob));
852 cksum = crc32_le(cksum, ptr + off, count);
853 cfs_kunmap(pga[i]->pg);
854 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
857 nob -= pga[i]->count;
861 /* For sending we only compute the wrong checksum instead
862 * of corrupting the data so it is still correct on a redo */
863 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
869 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
870 struct lov_stripe_md *lsm, obd_count page_count,
871 struct brw_page **pga,
872 struct ptlrpc_request **reqp)
874 struct ptlrpc_request *req;
875 struct ptlrpc_bulk_desc *desc;
876 struct ost_body *body;
877 struct obd_ioobj *ioobj;
878 struct niobuf_remote *niobuf;
879 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
880 int niocount, i, requested_nob, opc, rc;
881 struct ptlrpc_request_pool *pool;
882 struct osc_brw_async_args *aa;
885 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
886 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
888 for (niocount = i = 1; i < page_count; i++) {
889 if (!can_merge_pages(pga[i - 1], pga[i]))
893 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
894 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
896 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
897 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
902 /* FIXME bug 249. Also see bug 7198 */
903 if (cli->cl_import->imp_connect_data.ocd_connect_flags &
904 OBD_CONNECT_REQPORTAL)
905 req->rq_request_portal = OST_IO_PORTAL;
907 if (opc == OST_WRITE)
908 desc = ptlrpc_prep_bulk_imp (req, page_count,
909 BULK_GET_SOURCE, OST_BULK_PORTAL);
911 desc = ptlrpc_prep_bulk_imp (req, page_count,
912 BULK_PUT_SINK, OST_BULK_PORTAL);
914 GOTO(out, rc = -ENOMEM);
915 /* NB request now owns desc and will free it when it gets freed */
917 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
918 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
919 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
920 niocount * sizeof(*niobuf));
922 memcpy(&body->oa, oa, sizeof(*oa));
924 obdo_to_ioobj(oa, ioobj);
925 ioobj->ioo_bufcnt = niocount;
927 LASSERT (page_count > 0);
928 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
929 struct brw_page *pg = pga[i];
930 struct brw_page *pg_prev = pga[i - 1];
932 LASSERT(pg->count > 0);
933 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
934 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
937 LASSERTF(i == 0 || pg->off > pg_prev->off,
938 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
939 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
941 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
942 pg_prev->pg, page_private(pg_prev->pg),
943 pg_prev->pg->index, pg_prev->off);
945 LASSERTF(i == 0 || pg->off > pg_prev->off,
946 "i %d p_c %u\n", i, page_count);
948 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
949 (pg->flag & OBD_BRW_SRVLOCK));
951 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
953 requested_nob += pg->count;
955 if (i > 0 && can_merge_pages(pg_prev, pg)) {
957 niobuf->len += pg->count;
959 niobuf->offset = pg->off;
960 niobuf->len = pg->count;
961 niobuf->flags = pg->flag;
965 LASSERT((void *)(niobuf - niocount) ==
966 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
967 niocount * sizeof(*niobuf)));
968 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
970 /* size[REQ_REC_OFF] still sizeof (*body) */
971 if (opc == OST_WRITE) {
972 if (unlikely(cli->cl_checksum)) {
973 body->oa.o_valid |= OBD_MD_FLCKSUM;
974 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
976 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
978 /* save this in 'oa', too, for later checking */
979 oa->o_valid |= OBD_MD_FLCKSUM;
981 /* clear out the checksum flag, in case this is a
982 * resend but cl_checksum is no longer set. b=11238 */
983 oa->o_valid &= ~OBD_MD_FLCKSUM;
985 oa->o_cksum = body->oa.o_cksum;
986 /* 1 RC per niobuf */
987 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
988 ptlrpc_req_set_repsize(req, 3, size);
990 if (unlikely(cli->cl_checksum))
991 body->oa.o_valid |= OBD_MD_FLCKSUM;
992 /* 1 RC for the whole I/O */
993 ptlrpc_req_set_repsize(req, 2, size);
996 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
997 aa = (struct osc_brw_async_args *)&req->rq_async_args;
999 aa->aa_requested_nob = requested_nob;
1000 aa->aa_nio_count = niocount;
1001 aa->aa_page_count = page_count;
1002 aa->aa_retries = 5; /*retry for checksum errors; lprocfs? */
1005 INIT_LIST_HEAD(&aa->aa_oaps);
1011 ptlrpc_req_finished (req);
1015 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1016 __u32 client_cksum, __u32 server_cksum, int nob,
1017 obd_count page_count, struct brw_page **pga)
1022 if (server_cksum == client_cksum) {
1023 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1027 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1029 if (new_cksum == server_cksum)
1030 msg = "changed on the client after we checksummed it";
1031 else if (new_cksum == client_cksum)
1032 msg = "changed in transit before arrival at OST";
1034 msg = "changed in transit AND doesn't match the original";
1036 LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1037 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1038 msg, libcfs_nid2str(peer->nid),
1039 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1040 oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1042 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1044 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1045 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1046 client_cksum, server_cksum, new_cksum);
1051 /* Note rc enters this function as number of bytes transferred */
1052 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1054 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1055 const lnet_process_id_t *peer =
1056 &req->rq_import->imp_connection->c_peer;
1057 struct client_obd *cli = aa->aa_cli;
1058 struct ost_body *body;
1059 __u32 client_cksum = 0;
1062 if (rc < 0 && rc != -EDQUOT)
1065 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1066 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1067 lustre_swab_ost_body);
1069 CERROR ("Can't unpack body\n");
1073 /* set/clear over quota flag for a uid/gid */
1074 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1075 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1076 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1077 body->oa.o_gid, body->oa.o_valid,
1083 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1084 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1086 osc_update_grant(cli, body);
1088 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1090 CERROR ("Unexpected +ve rc %d\n", rc);
1093 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1095 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1097 check_write_checksum(&body->oa, peer, client_cksum,
1099 aa->aa_requested_nob,
1104 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1105 aa->aa_page_count, aa->aa_ppga);
1109 /* The rest of this function executes only for OST_READs */
1110 if (rc > aa->aa_requested_nob) {
1111 CERROR("Unexpected rc %d (%d requested)\n", rc,
1112 aa->aa_requested_nob);
1116 if (rc != req->rq_bulk->bd_nob_transferred) {
1117 CERROR ("Unexpected rc %d (%d transferred)\n",
1118 rc, req->rq_bulk->bd_nob_transferred);
1122 if (rc < aa->aa_requested_nob)
1123 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1125 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1126 static int cksum_counter;
1127 __u32 server_cksum = body->oa.o_cksum;
1128 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1131 if (server_cksum == ~0 && rc > 0) {
1132 CERROR("Protocol error: server %s set the 'checksum' "
1133 "bit, but didn't send a checksum. Not fatal, "
1134 "but please tell CFS.\n",
1135 libcfs_nid2str(peer->nid));
1136 } else if (server_cksum != client_cksum) {
1137 LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s inum "
1138 LPU64"/"LPU64" object "LPU64"/"LPU64
1139 " extent ["LPU64"-"LPU64"]\n",
1140 req->rq_import->imp_obd->obd_name,
1141 libcfs_nid2str(peer->nid),
1142 body->oa.o_valid & OBD_MD_FLFID ?
1143 body->oa.o_fid : (__u64)0,
1144 body->oa.o_valid & OBD_MD_FLFID ?
1145 body->oa.o_generation :(__u64)0,
1147 body->oa.o_valid & OBD_MD_FLGROUP ?
1148 body->oa.o_gr : (__u64)0,
1149 aa->aa_ppga[0]->off,
1150 aa->aa_ppga[aa->aa_page_count-1]->off +
1151 aa->aa_ppga[aa->aa_page_count-1]->count -
1153 CERROR("client %x, server %x\n",
1154 client_cksum, server_cksum);
1156 aa->aa_oa->o_cksum = client_cksum;
1160 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1163 } else if (unlikely(client_cksum)) {
1164 static int cksum_missed;
1167 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1168 CERROR("Checksum %u requested from %s but not sent\n",
1169 cksum_missed, libcfs_nid2str(peer->nid));
1175 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1180 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1181 struct lov_stripe_md *lsm,
1182 obd_count page_count, struct brw_page **pga)
1184 struct ptlrpc_request *request;
1185 int rc, retries = 5; /* lprocfs? */
1189 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1190 page_count, pga, &request);
1194 rc = ptlrpc_queue_wait(request);
1196 if (rc == -ETIMEDOUT && request->rq_resend) {
1197 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1198 ptlrpc_req_finished(request);
1202 rc = osc_brw_fini_request(request, rc);
1204 ptlrpc_req_finished(request);
1205 if (rc == -EAGAIN) {
1213 int osc_brw_redo_request(struct ptlrpc_request *request,
1214 struct osc_brw_async_args *aa)
1216 struct ptlrpc_request *new_req;
1217 struct ptlrpc_request_set *set = request->rq_set;
1218 struct osc_brw_async_args *new_aa;
1219 struct osc_async_page *oap;
1223 if (aa->aa_retries-- <= 0) {
1224 CERROR("too many checksum retries, returning error\n");
1228 DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1229 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1230 if (oap->oap_request != NULL) {
1231 LASSERTF(request == oap->oap_request,
1232 "request %p != oap_request %p\n",
1233 request, oap->oap_request);
1234 if (oap->oap_interrupted) {
1235 ptlrpc_mark_interrupted(oap->oap_request);
1244 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1245 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1246 aa->aa_cli, aa->aa_oa,
1247 NULL /* lsm unused by osc currently */,
1248 aa->aa_page_count, aa->aa_ppga, &new_req);
1252 /* New request takes over pga and oaps from old request.
1253 * Note that copying a list_head doesn't work, need to move it... */
1254 new_req->rq_interpret_reply = request->rq_interpret_reply;
1255 new_req->rq_async_args = request->rq_async_args;
1256 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1257 INIT_LIST_HEAD(&new_aa->aa_oaps);
1258 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1259 INIT_LIST_HEAD(&aa->aa_oaps);
1261 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1262 if (oap->oap_request) {
1263 ptlrpc_req_finished(oap->oap_request);
1264 oap->oap_request = ptlrpc_request_addref(new_req);
1268 ptlrpc_set_add_req(set, new_req);
1273 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1275 struct osc_brw_async_args *aa = data;
1279 rc = osc_brw_fini_request(request, rc);
1280 if (rc == -EAGAIN) {
1281 rc = osc_brw_redo_request(request, aa);
1286 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1287 for (i = 0; i < aa->aa_page_count; i++)
1288 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1289 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1291 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1296 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1297 struct lov_stripe_md *lsm, obd_count page_count,
1298 struct brw_page **pga, struct ptlrpc_request_set *set)
1300 struct ptlrpc_request *request;
1301 struct client_obd *cli = &exp->exp_obd->u.cli;
1305 /* Consume write credits even if doing a sync write -
1306 * otherwise we may run out of space on OST due to grant. */
1307 spin_lock(&cli->cl_loi_list_lock);
1308 for (i = 0; i < page_count; i++) {
1309 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1310 osc_consume_write_grant(cli, pga[i]);
1312 spin_unlock(&cli->cl_loi_list_lock);
1314 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1315 page_count, pga, &request);
1318 request->rq_interpret_reply = brw_interpret;
1319 ptlrpc_set_add_req(set, request);
1321 spin_lock(&cli->cl_loi_list_lock);
1322 for (i = 0; i < page_count; i++)
1323 osc_release_write_grant(cli, pga[i], 0);
1324 spin_unlock(&cli->cl_loi_list_lock);
1331 * ugh, we want disk allocation on the target to happen in offset order. we'll
1332 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1333 * fine for our small page arrays and doesn't require allocation. its an
1334 * insertion sort that swaps elements that are strides apart, shrinking the
1335 * stride down until its '1' and the array is sorted.
1337 static void sort_brw_pages(struct brw_page **array, int num)
1340 struct brw_page *tmp;
1344 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1349 for (i = stride ; i < num ; i++) {
1352 while (j >= stride && array[j-stride]->off > tmp->off) {
1353 array[j] = array[j - stride];
1358 } while (stride > 1);
1361 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1367 LASSERT (pages > 0);
1368 offset = pg[i]->off & (~CFS_PAGE_MASK);
1372 if (pages == 0) /* that's all */
1375 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1376 return count; /* doesn't end on page boundary */
1379 offset = pg[i]->off & (~CFS_PAGE_MASK);
1380 if (offset != 0) /* doesn't start on page boundary */
1387 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1389 struct brw_page **ppga;
1392 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1396 for (i = 0; i < count; i++)
1401 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1403 LASSERT(ppga != NULL);
1404 OBD_FREE(ppga, sizeof(*ppga) * count);
1407 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1408 obd_count page_count, struct brw_page *pga,
1409 struct obd_trans_info *oti)
1411 struct obdo *saved_oa = NULL;
1412 struct brw_page **ppga, **orig;
1413 struct obd_import *imp = class_exp2cliimp(exp);
1414 struct client_obd *cli = &imp->imp_obd->u.cli;
1415 int rc, page_count_orig;
1418 if (cmd & OBD_BRW_CHECK) {
1419 /* The caller just wants to know if there's a chance that this
1420 * I/O can succeed */
1422 if (imp == NULL || imp->imp_invalid)
1427 /* test_brw with a failed create can trip this, maybe others. */
1428 LASSERT(cli->cl_max_pages_per_rpc);
1432 orig = ppga = osc_build_ppga(pga, page_count);
1435 page_count_orig = page_count;
1437 sort_brw_pages(ppga, page_count);
1438 while (page_count) {
1439 obd_count pages_per_brw;
1441 if (page_count > cli->cl_max_pages_per_rpc)
1442 pages_per_brw = cli->cl_max_pages_per_rpc;
1444 pages_per_brw = page_count;
1446 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1448 if (saved_oa != NULL) {
1449 /* restore previously saved oa */
1450 *oinfo->oi_oa = *saved_oa;
1451 } else if (page_count > pages_per_brw) {
1452 /* save a copy of oa (brw will clobber it) */
1453 saved_oa = obdo_alloc();
1454 if (saved_oa == NULL)
1455 GOTO(out, rc = -ENOMEM);
1456 *saved_oa = *oinfo->oi_oa;
1459 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1460 pages_per_brw, ppga);
1465 page_count -= pages_per_brw;
1466 ppga += pages_per_brw;
1470 osc_release_ppga(orig, page_count_orig);
1472 if (saved_oa != NULL)
1473 obdo_free(saved_oa);
1478 static int osc_brw_async(int cmd, struct obd_export *exp,
1479 struct obd_info *oinfo, obd_count page_count,
1480 struct brw_page *pga, struct obd_trans_info *oti,
1481 struct ptlrpc_request_set *set)
1483 struct brw_page **ppga, **orig;
1484 int page_count_orig;
1488 if (cmd & OBD_BRW_CHECK) {
1489 /* The caller just wants to know if there's a chance that this
1490 * I/O can succeed */
1491 struct obd_import *imp = class_exp2cliimp(exp);
1493 if (imp == NULL || imp->imp_invalid)
1498 orig = ppga = osc_build_ppga(pga, page_count);
1501 page_count_orig = page_count;
1503 sort_brw_pages(ppga, page_count);
1504 while (page_count) {
1505 struct brw_page **copy;
1506 obd_count pages_per_brw;
1508 pages_per_brw = min_t(obd_count, page_count,
1509 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1511 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1513 /* use ppga only if single RPC is going to fly */
1514 if (pages_per_brw != page_count_orig || ppga != orig) {
1515 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1517 GOTO(out, rc = -ENOMEM);
1518 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1522 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1523 pages_per_brw, copy, set);
1527 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1532 /* we passed it to async_internal() which is
1533 * now responsible for releasing memory */
1537 page_count -= pages_per_brw;
1538 ppga += pages_per_brw;
1542 osc_release_ppga(orig, page_count_orig);
1546 static void osc_check_rpcs(struct client_obd *cli);
1548 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1549 * the dirty accounting. Writeback completes or truncate happens before
1550 * writing starts. Must be called with the loi lock held. */
1551 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1554 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1557 /* This maintains the lists of pending pages to read/write for a given object
1558 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1559 * to quickly find objects that are ready to send an RPC. */
1560 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1566 if (lop->lop_num_pending == 0)
1569 /* if we have an invalid import we want to drain the queued pages
1570 * by forcing them through rpcs that immediately fail and complete
1571 * the pages. recovery relies on this to empty the queued pages
1572 * before canceling the locks and evicting down the llite pages */
1573 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1576 /* stream rpcs in queue order as long as as there is an urgent page
1577 * queued. this is our cheap solution for good batching in the case
1578 * where writepage marks some random page in the middle of the file
1579 * as urgent because of, say, memory pressure */
1580 if (!list_empty(&lop->lop_urgent)) {
1581 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1585 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1586 optimal = cli->cl_max_pages_per_rpc;
1587 if (cmd & OBD_BRW_WRITE) {
1588 /* trigger a write rpc stream as long as there are dirtiers
1589 * waiting for space. as they're waiting, they're not going to
1590 * create more pages to coallesce with what's waiting.. */
1591 if (!list_empty(&cli->cl_cache_waiters)) {
1592 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1596 /* +16 to avoid triggering rpcs that would want to include pages
1597 * that are being queued but which can't be made ready until
1598 * the queuer finishes with the page. this is a wart for
1599 * llite::commit_write() */
1602 if (lop->lop_num_pending >= optimal)
1608 static void on_list(struct list_head *item, struct list_head *list,
1611 if (list_empty(item) && should_be_on)
1612 list_add_tail(item, list);
1613 else if (!list_empty(item) && !should_be_on)
1614 list_del_init(item);
1617 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1618 * can find pages to build into rpcs quickly */
1619 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1621 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1622 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1623 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1625 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1626 loi->loi_write_lop.lop_num_pending);
1628 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1629 loi->loi_read_lop.lop_num_pending);
1632 static void lop_update_pending(struct client_obd *cli,
1633 struct loi_oap_pages *lop, int cmd, int delta)
1635 lop->lop_num_pending += delta;
1636 if (cmd & OBD_BRW_WRITE)
1637 cli->cl_pending_w_pages += delta;
1639 cli->cl_pending_r_pages += delta;
1642 /* this is called when a sync waiter receives an interruption. Its job is to
1643 * get the caller woken as soon as possible. If its page hasn't been put in an
1644 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1645 * desiring interruption which will forcefully complete the rpc once the rpc
1647 static void osc_occ_interrupted(struct oig_callback_context *occ)
1649 struct osc_async_page *oap;
1650 struct loi_oap_pages *lop;
1651 struct lov_oinfo *loi;
1654 /* XXX member_of() */
1655 oap = list_entry(occ, struct osc_async_page, oap_occ);
1657 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1659 oap->oap_interrupted = 1;
1661 /* ok, it's been put in an rpc. only one oap gets a request reference */
1662 if (oap->oap_request != NULL) {
1663 ptlrpc_mark_interrupted(oap->oap_request);
1664 ptlrpcd_wake(oap->oap_request);
1668 /* we don't get interruption callbacks until osc_trigger_group_io()
1669 * has been called and put the sync oaps in the pending/urgent lists.*/
1670 if (!list_empty(&oap->oap_pending_item)) {
1671 list_del_init(&oap->oap_pending_item);
1672 list_del_init(&oap->oap_urgent_item);
1675 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1676 &loi->loi_write_lop : &loi->loi_read_lop;
1677 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1678 loi_list_maint(oap->oap_cli, oap->oap_loi);
1680 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1681 oap->oap_oig = NULL;
1685 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1688 /* this is trying to propogate async writeback errors back up to the
1689 * application. As an async write fails we record the error code for later if
1690 * the app does an fsync. As long as errors persist we force future rpcs to be
1691 * sync so that the app can get a sync error and break the cycle of queueing
1692 * pages for which writeback will fail. */
1693 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1700 ar->ar_force_sync = 1;
1701 ar->ar_min_xid = ptlrpc_sample_next_xid();
1706 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1707 ar->ar_force_sync = 0;
1710 static void osc_oap_to_pending(struct osc_async_page *oap)
1712 struct loi_oap_pages *lop;
1714 if (oap->oap_cmd & OBD_BRW_WRITE)
1715 lop = &oap->oap_loi->loi_write_lop;
1717 lop = &oap->oap_loi->loi_read_lop;
1719 if (oap->oap_async_flags & ASYNC_URGENT)
1720 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1721 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1722 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1725 /* this must be called holding the loi list lock to give coverage to exit_cache,
1726 * async_flag maintenance, and oap_request */
1727 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1728 struct osc_async_page *oap, int sent, int rc)
1731 oap->oap_async_flags = 0;
1732 oap->oap_interrupted = 0;
1734 if (oap->oap_cmd & OBD_BRW_WRITE) {
1735 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1736 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1739 if (oap->oap_request != NULL) {
1740 ptlrpc_req_finished(oap->oap_request);
1741 oap->oap_request = NULL;
1744 if (rc == 0 && oa != NULL) {
1745 if (oa->o_valid & OBD_MD_FLBLOCKS)
1746 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1747 if (oa->o_valid & OBD_MD_FLMTIME)
1748 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1749 if (oa->o_valid & OBD_MD_FLATIME)
1750 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1751 if (oa->o_valid & OBD_MD_FLCTIME)
1752 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1756 osc_exit_cache(cli, oap, sent);
1757 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1758 oap->oap_oig = NULL;
1763 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1764 oap->oap_cmd, oa, rc);
1766 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1767 * I/O on the page could start, but OSC calls it under lock
1768 * and thus we can add oap back to pending safely */
1770 /* upper layer wants to leave the page on pending queue */
1771 osc_oap_to_pending(oap);
1773 osc_exit_cache(cli, oap, sent);
1777 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1779 struct osc_brw_async_args *aa = data;
1780 struct osc_async_page *oap, *tmp;
1781 struct client_obd *cli;
1784 rc = osc_brw_fini_request(request, rc);
1785 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1786 if (rc == -EAGAIN) {
1787 rc = osc_brw_redo_request(request, aa);
1795 client_obd_list_lock(&cli->cl_loi_list_lock);
1797 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1798 * is called so we know whether to go to sync BRWs or wait for more
1799 * RPCs to complete */
1800 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1801 cli->cl_w_in_flight--;
1803 cli->cl_r_in_flight--;
1805 /* the caller may re-use the oap after the completion call so
1806 * we need to clean it up a little */
1807 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1808 list_del_init(&oap->oap_rpc_item);
1809 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1812 osc_wake_cache_waiters(cli);
1813 osc_check_rpcs(cli);
1815 client_obd_list_unlock(&cli->cl_loi_list_lock);
1817 obdo_free(aa->aa_oa);
1821 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1825 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1826 struct list_head *rpc_list,
1827 int page_count, int cmd)
1829 struct ptlrpc_request *req;
1830 struct brw_page **pga = NULL;
1831 struct osc_brw_async_args *aa;
1832 struct obdo *oa = NULL;
1833 struct obd_async_page_ops *ops = NULL;
1834 void *caller_data = NULL;
1835 struct osc_async_page *oap;
1839 LASSERT(!list_empty(rpc_list));
1841 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1843 RETURN(ERR_PTR(-ENOMEM));
1847 GOTO(out, req = ERR_PTR(-ENOMEM));
1850 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1852 ops = oap->oap_caller_ops;
1853 caller_data = oap->oap_caller_data;
1855 pga[i] = &oap->oap_brw_page;
1856 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1857 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1858 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1862 /* always get the data for the obdo for the rpc */
1863 LASSERT(ops != NULL);
1864 ops->ap_fill_obdo(caller_data, cmd, oa);
1866 sort_brw_pages(pga, page_count);
1867 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1869 CERROR("prep_req failed: %d\n", rc);
1870 GOTO(out, req = ERR_PTR(rc));
1873 /* Need to update the timestamps after the request is built in case
1874 * we race with setattr (locally or in queue at OST). If OST gets
1875 * later setattr before earlier BRW (as determined by the request xid),
1876 * the OST will not use BRW timestamps. Sadly, there is no obvious
1877 * way to do this in a single call. bug 10150 */
1878 ops->ap_update_obdo(caller_data, cmd, oa,
1879 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1881 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1882 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1883 INIT_LIST_HEAD(&aa->aa_oaps);
1884 list_splice(rpc_list, &aa->aa_oaps);
1885 INIT_LIST_HEAD(rpc_list);
1892 OBD_FREE(pga, sizeof(*pga) * page_count);
1897 /* the loi lock is held across this function but it's allowed to release
1898 * and reacquire it during its work */
1899 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1900 int cmd, struct loi_oap_pages *lop)
1902 struct ptlrpc_request *req;
1903 obd_count page_count = 0;
1904 struct osc_async_page *oap = NULL, *tmp;
1905 struct osc_brw_async_args *aa;
1906 struct obd_async_page_ops *ops;
1907 CFS_LIST_HEAD(rpc_list);
1908 unsigned int ending_offset;
1909 unsigned starting_offset = 0;
1912 /* first we find the pages we're allowed to work with */
1913 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1914 ops = oap->oap_caller_ops;
1916 LASSERT(oap->oap_magic == OAP_MAGIC);
1918 /* in llite being 'ready' equates to the page being locked
1919 * until completion unlocks it. commit_write submits a page
1920 * as not ready because its unlock will happen unconditionally
1921 * as the call returns. if we race with commit_write giving
1922 * us that page we dont' want to create a hole in the page
1923 * stream, so we stop and leave the rpc to be fired by
1924 * another dirtier or kupdated interval (the not ready page
1925 * will still be on the dirty list). we could call in
1926 * at the end of ll_file_write to process the queue again. */
1927 if (!(oap->oap_async_flags & ASYNC_READY)) {
1928 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1930 CDEBUG(D_INODE, "oap %p page %p returned %d "
1931 "instead of ready\n", oap,
1935 /* llite is telling us that the page is still
1936 * in commit_write and that we should try
1937 * and put it in an rpc again later. we
1938 * break out of the loop so we don't create
1939 * a hole in the sequence of pages in the rpc
1944 /* the io isn't needed.. tell the checks
1945 * below to complete the rpc with EINTR */
1946 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1947 oap->oap_count = -EINTR;
1950 oap->oap_async_flags |= ASYNC_READY;
1953 LASSERTF(0, "oap %p page %p returned %d "
1954 "from make_ready\n", oap,
1962 * Page submitted for IO has to be locked. Either by
1963 * ->ap_make_ready() or by higher layers.
1965 * XXX nikita: this assertion should be adjusted when lustre
1966 * starts using PG_writeback for pages being written out.
1968 #if defined(__KERNEL__) && defined(__LINUX__)
1969 LASSERT(PageLocked(oap->oap_page));
1971 /* If there is a gap at the start of this page, it can't merge
1972 * with any previous page, so we'll hand the network a
1973 * "fragmented" page array that it can't transfer in 1 RDMA */
1974 if (page_count != 0 && oap->oap_page_off != 0)
1977 /* take the page out of our book-keeping */
1978 list_del_init(&oap->oap_pending_item);
1979 lop_update_pending(cli, lop, cmd, -1);
1980 list_del_init(&oap->oap_urgent_item);
1982 if (page_count == 0)
1983 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1984 (PTLRPC_MAX_BRW_SIZE - 1);
1986 /* ask the caller for the size of the io as the rpc leaves. */
1987 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1989 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1990 if (oap->oap_count <= 0) {
1991 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1993 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1997 /* now put the page back in our accounting */
1998 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1999 if (++page_count >= cli->cl_max_pages_per_rpc)
2002 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2003 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2004 * have the same alignment as the initial writes that allocated
2005 * extents on the server. */
2006 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2007 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2008 if (ending_offset == 0)
2011 /* If there is a gap at the end of this page, it can't merge
2012 * with any subsequent pages, so we'll hand the network a
2013 * "fragmented" page array that it can't transfer in 1 RDMA */
2014 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2018 osc_wake_cache_waiters(cli);
2020 if (page_count == 0)
2023 loi_list_maint(cli, loi);
2025 client_obd_list_unlock(&cli->cl_loi_list_lock);
2027 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2029 /* this should happen rarely and is pretty bad, it makes the
2030 * pending list not follow the dirty order */
2031 client_obd_list_lock(&cli->cl_loi_list_lock);
2032 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2033 list_del_init(&oap->oap_rpc_item);
2035 /* queued sync pages can be torn down while the pages
2036 * were between the pending list and the rpc */
2037 if (oap->oap_interrupted) {
2038 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2039 osc_ap_completion(cli, NULL, oap, 0,
2043 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2045 loi_list_maint(cli, loi);
2046 RETURN(PTR_ERR(req));
2049 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2050 if (cmd == OBD_BRW_READ) {
2051 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2052 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2053 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2054 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2055 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2057 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2058 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2059 cli->cl_w_in_flight);
2060 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2061 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2062 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2065 client_obd_list_lock(&cli->cl_loi_list_lock);
2067 if (cmd == OBD_BRW_READ)
2068 cli->cl_r_in_flight++;
2070 cli->cl_w_in_flight++;
2072 /* queued sync pages can be torn down while the pages
2073 * were between the pending list and the rpc */
2075 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2076 /* only one oap gets a request reference */
2079 if (oap->oap_interrupted && !req->rq_intr) {
2080 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2082 ptlrpc_mark_interrupted(req);
2086 tmp->oap_request = ptlrpc_request_addref(req);
2088 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2089 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2091 req->rq_interpret_reply = brw_interpret_oap;
2092 ptlrpcd_add_req(req);
2096 #define LOI_DEBUG(LOI, STR, args...) \
2097 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2098 !list_empty(&(LOI)->loi_cli_item), \
2099 (LOI)->loi_write_lop.lop_num_pending, \
2100 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2101 (LOI)->loi_read_lop.lop_num_pending, \
2102 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2105 /* This is called by osc_check_rpcs() to find which objects have pages that
2106 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2107 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2110 /* first return all objects which we already know to have
2111 * pages ready to be stuffed into rpcs */
2112 if (!list_empty(&cli->cl_loi_ready_list))
2113 RETURN(list_entry(cli->cl_loi_ready_list.next,
2114 struct lov_oinfo, loi_cli_item));
2116 /* then if we have cache waiters, return all objects with queued
2117 * writes. This is especially important when many small files
2118 * have filled up the cache and not been fired into rpcs because
2119 * they don't pass the nr_pending/object threshhold */
2120 if (!list_empty(&cli->cl_cache_waiters) &&
2121 !list_empty(&cli->cl_loi_write_list))
2122 RETURN(list_entry(cli->cl_loi_write_list.next,
2123 struct lov_oinfo, loi_write_item));
2125 /* then return all queued objects when we have an invalid import
2126 * so that they get flushed */
2127 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2128 if (!list_empty(&cli->cl_loi_write_list))
2129 RETURN(list_entry(cli->cl_loi_write_list.next,
2130 struct lov_oinfo, loi_write_item));
2131 if (!list_empty(&cli->cl_loi_read_list))
2132 RETURN(list_entry(cli->cl_loi_read_list.next,
2133 struct lov_oinfo, loi_read_item));
2138 /* called with the loi list lock held */
2139 static void osc_check_rpcs(struct client_obd *cli)
2141 struct lov_oinfo *loi;
2142 int rc = 0, race_counter = 0;
2145 while ((loi = osc_next_loi(cli)) != NULL) {
2146 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2148 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2151 /* attempt some read/write balancing by alternating between
2152 * reads and writes in an object. The makes_rpc checks here
2153 * would be redundant if we were getting read/write work items
2154 * instead of objects. we don't want send_oap_rpc to drain a
2155 * partial read pending queue when we're given this object to
2156 * do io on writes while there are cache waiters */
2157 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2158 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2159 &loi->loi_write_lop);
2167 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2168 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2169 &loi->loi_read_lop);
2178 /* attempt some inter-object balancing by issueing rpcs
2179 * for each object in turn */
2180 if (!list_empty(&loi->loi_cli_item))
2181 list_del_init(&loi->loi_cli_item);
2182 if (!list_empty(&loi->loi_write_item))
2183 list_del_init(&loi->loi_write_item);
2184 if (!list_empty(&loi->loi_read_item))
2185 list_del_init(&loi->loi_read_item);
2187 loi_list_maint(cli, loi);
2189 /* send_oap_rpc fails with 0 when make_ready tells it to
2190 * back off. llite's make_ready does this when it tries
2191 * to lock a page queued for write that is already locked.
2192 * we want to try sending rpcs from many objects, but we
2193 * don't want to spin failing with 0. */
2194 if (race_counter == 10)
2200 /* we're trying to queue a page in the osc so we're subject to the
2201 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2202 * If the osc's queued pages are already at that limit, then we want to sleep
2203 * until there is space in the osc's queue for us. We also may be waiting for
2204 * write credits from the OST if there are RPCs in flight that may return some
2205 * before we fall back to sync writes.
2207 * We need this know our allocation was granted in the presence of signals */
2208 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2212 client_obd_list_lock(&cli->cl_loi_list_lock);
2213 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2214 client_obd_list_unlock(&cli->cl_loi_list_lock);
2218 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2219 * grant or cache space. */
2220 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2221 struct osc_async_page *oap)
2223 struct osc_cache_waiter ocw;
2224 struct l_wait_info lwi = { 0 };
2227 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2228 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2229 cli->cl_dirty_max, obd_max_dirty_pages,
2230 cli->cl_lost_grant, cli->cl_avail_grant);
2232 /* force the caller to try sync io. this can jump the list
2233 * of queued writes and create a discontiguous rpc stream */
2234 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2235 loi->loi_ar.ar_force_sync)
2238 /* Hopefully normal case - cache space and write credits available */
2239 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2240 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2241 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2242 /* account for ourselves */
2243 osc_consume_write_grant(cli, &oap->oap_brw_page);
2247 /* Make sure that there are write rpcs in flight to wait for. This
2248 * is a little silly as this object may not have any pending but
2249 * other objects sure might. */
2250 if (cli->cl_w_in_flight) {
2251 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2252 cfs_waitq_init(&ocw.ocw_waitq);
2256 loi_list_maint(cli, loi);
2257 osc_check_rpcs(cli);
2258 client_obd_list_unlock(&cli->cl_loi_list_lock);
2260 CDEBUG(D_CACHE, "sleeping for cache space\n");
2261 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2263 client_obd_list_lock(&cli->cl_loi_list_lock);
2264 if (!list_empty(&ocw.ocw_entry)) {
2265 list_del(&ocw.ocw_entry);
2274 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2275 struct lov_oinfo *loi, cfs_page_t *page,
2276 obd_off offset, struct obd_async_page_ops *ops,
2277 void *data, void **res)
2279 struct osc_async_page *oap;
2283 return size_round(sizeof(*oap));
2286 oap->oap_magic = OAP_MAGIC;
2287 oap->oap_cli = &exp->exp_obd->u.cli;
2290 oap->oap_caller_ops = ops;
2291 oap->oap_caller_data = data;
2293 oap->oap_page = page;
2294 oap->oap_obj_off = offset;
2296 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2297 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2298 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2300 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2302 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2306 struct osc_async_page *oap_from_cookie(void *cookie)
2308 struct osc_async_page *oap = cookie;
2309 if (oap->oap_magic != OAP_MAGIC)
2310 return ERR_PTR(-EINVAL);
2314 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2315 struct lov_oinfo *loi, void *cookie,
2316 int cmd, obd_off off, int count,
2317 obd_flag brw_flags, enum async_flags async_flags)
2319 struct client_obd *cli = &exp->exp_obd->u.cli;
2320 struct osc_async_page *oap;
2324 oap = oap_from_cookie(cookie);
2326 RETURN(PTR_ERR(oap));
2328 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2331 if (!list_empty(&oap->oap_pending_item) ||
2332 !list_empty(&oap->oap_urgent_item) ||
2333 !list_empty(&oap->oap_rpc_item))
2336 /* check if the file's owner/group is over quota */
2337 #ifdef HAVE_QUOTA_SUPPORT
2338 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2339 struct obd_async_page_ops *ops;
2346 ops = oap->oap_caller_ops;
2347 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2348 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2359 loi = &lsm->lsm_oinfo[0];
2361 client_obd_list_lock(&cli->cl_loi_list_lock);
2364 oap->oap_page_off = off;
2365 oap->oap_count = count;
2366 oap->oap_brw_flags = brw_flags;
2367 oap->oap_async_flags = async_flags;
2369 if (cmd & OBD_BRW_WRITE) {
2370 rc = osc_enter_cache(cli, loi, oap);
2372 client_obd_list_unlock(&cli->cl_loi_list_lock);
2377 osc_oap_to_pending(oap);
2378 loi_list_maint(cli, loi);
2380 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2383 osc_check_rpcs(cli);
2384 client_obd_list_unlock(&cli->cl_loi_list_lock);
2389 /* aka (~was & now & flag), but this is more clear :) */
2390 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2392 static int osc_set_async_flags(struct obd_export *exp,
2393 struct lov_stripe_md *lsm,
2394 struct lov_oinfo *loi, void *cookie,
2395 obd_flag async_flags)
2397 struct client_obd *cli = &exp->exp_obd->u.cli;
2398 struct loi_oap_pages *lop;
2399 struct osc_async_page *oap;
2403 oap = oap_from_cookie(cookie);
2405 RETURN(PTR_ERR(oap));
2408 * bug 7311: OST-side locking is only supported for liblustre for now
2409 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2410 * implementation has to handle case where OST-locked page was picked
2411 * up by, e.g., ->writepage().
2413 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2414 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2417 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2421 loi = &lsm->lsm_oinfo[0];
2423 if (oap->oap_cmd & OBD_BRW_WRITE) {
2424 lop = &loi->loi_write_lop;
2426 lop = &loi->loi_read_lop;
2429 client_obd_list_lock(&cli->cl_loi_list_lock);
2431 if (list_empty(&oap->oap_pending_item))
2432 GOTO(out, rc = -EINVAL);
2434 if ((oap->oap_async_flags & async_flags) == async_flags)
2437 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2438 oap->oap_async_flags |= ASYNC_READY;
2440 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2441 if (list_empty(&oap->oap_rpc_item)) {
2442 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2443 loi_list_maint(cli, loi);
2447 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2448 oap->oap_async_flags);
2450 osc_check_rpcs(cli);
2451 client_obd_list_unlock(&cli->cl_loi_list_lock);
2455 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2456 struct lov_oinfo *loi,
2457 struct obd_io_group *oig, void *cookie,
2458 int cmd, obd_off off, int count,
2460 obd_flag async_flags)
2462 struct client_obd *cli = &exp->exp_obd->u.cli;
2463 struct osc_async_page *oap;
2464 struct loi_oap_pages *lop;
2468 oap = oap_from_cookie(cookie);
2470 RETURN(PTR_ERR(oap));
2472 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2475 if (!list_empty(&oap->oap_pending_item) ||
2476 !list_empty(&oap->oap_urgent_item) ||
2477 !list_empty(&oap->oap_rpc_item))
2481 loi = &lsm->lsm_oinfo[0];
2483 client_obd_list_lock(&cli->cl_loi_list_lock);
2486 oap->oap_page_off = off;
2487 oap->oap_count = count;
2488 oap->oap_brw_flags = brw_flags;
2489 oap->oap_async_flags = async_flags;
2491 if (cmd & OBD_BRW_WRITE)
2492 lop = &loi->loi_write_lop;
2494 lop = &loi->loi_read_lop;
2496 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2497 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2499 rc = oig_add_one(oig, &oap->oap_occ);
2502 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2503 oap, oap->oap_page, rc);
2505 client_obd_list_unlock(&cli->cl_loi_list_lock);
2510 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2511 struct loi_oap_pages *lop, int cmd)
2513 struct list_head *pos, *tmp;
2514 struct osc_async_page *oap;
2516 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2517 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2518 list_del(&oap->oap_pending_item);
2519 osc_oap_to_pending(oap);
2521 loi_list_maint(cli, loi);
2524 static int osc_trigger_group_io(struct obd_export *exp,
2525 struct lov_stripe_md *lsm,
2526 struct lov_oinfo *loi,
2527 struct obd_io_group *oig)
2529 struct client_obd *cli = &exp->exp_obd->u.cli;
2533 loi = &lsm->lsm_oinfo[0];
2535 client_obd_list_lock(&cli->cl_loi_list_lock);
2537 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2538 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2540 osc_check_rpcs(cli);
2541 client_obd_list_unlock(&cli->cl_loi_list_lock);
2546 static int osc_teardown_async_page(struct obd_export *exp,
2547 struct lov_stripe_md *lsm,
2548 struct lov_oinfo *loi, void *cookie)
2550 struct client_obd *cli = &exp->exp_obd->u.cli;
2551 struct loi_oap_pages *lop;
2552 struct osc_async_page *oap;
2556 oap = oap_from_cookie(cookie);
2558 RETURN(PTR_ERR(oap));
2561 loi = &lsm->lsm_oinfo[0];
2563 if (oap->oap_cmd & OBD_BRW_WRITE) {
2564 lop = &loi->loi_write_lop;
2566 lop = &loi->loi_read_lop;
2569 client_obd_list_lock(&cli->cl_loi_list_lock);
2571 if (!list_empty(&oap->oap_rpc_item))
2572 GOTO(out, rc = -EBUSY);
2574 osc_exit_cache(cli, oap, 0);
2575 osc_wake_cache_waiters(cli);
2577 if (!list_empty(&oap->oap_urgent_item)) {
2578 list_del_init(&oap->oap_urgent_item);
2579 oap->oap_async_flags &= ~ASYNC_URGENT;
2581 if (!list_empty(&oap->oap_pending_item)) {
2582 list_del_init(&oap->oap_pending_item);
2583 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2585 loi_list_maint(cli, loi);
2587 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2589 client_obd_list_unlock(&cli->cl_loi_list_lock);
2593 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2596 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2599 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2602 lock_res_and_lock(lock);
2605 /* Liang XXX: Darwin and Winnt checking should be added */
2606 if (lock->l_ast_data && lock->l_ast_data != data) {
2607 struct inode *new_inode = data;
2608 struct inode *old_inode = lock->l_ast_data;
2609 if (!(old_inode->i_state & I_FREEING))
2610 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2611 LASSERTF(old_inode->i_state & I_FREEING,
2612 "Found existing inode %p/%lu/%u state %lu in lock: "
2613 "setting data to %p/%lu/%u\n", old_inode,
2614 old_inode->i_ino, old_inode->i_generation,
2616 new_inode, new_inode->i_ino, new_inode->i_generation);
2620 lock->l_ast_data = data;
2621 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2622 unlock_res_and_lock(lock);
2623 LDLM_LOCK_PUT(lock);
2626 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2627 ldlm_iterator_t replace, void *data)
2629 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2630 struct obd_device *obd = class_exp2obd(exp);
2632 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2636 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2642 /* The request was created before ldlm_cli_enqueue call. */
2643 if (rc == ELDLM_LOCK_ABORTED) {
2644 struct ldlm_reply *rep;
2646 /* swabbed by ldlm_cli_enqueue() */
2647 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2648 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2650 LASSERT(rep != NULL);
2651 if (rep->lock_policy_res1)
2652 rc = rep->lock_policy_res1;
2656 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2657 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2658 oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_size,
2659 oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_blocks,
2660 oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_mtime);
2663 /* Call the update callback. */
2664 rc = oinfo->oi_cb_up(oinfo, rc);
2668 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2669 struct osc_enqueue_args *aa, int rc)
2671 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2672 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2673 struct ldlm_lock *lock;
2675 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2677 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2679 /* Complete obtaining the lock procedure. */
2680 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2682 &aa->oa_ei->ei_flags,
2683 &lsm->lsm_oinfo->loi_lvb,
2684 sizeof(lsm->lsm_oinfo->loi_lvb),
2685 lustre_swab_ost_lvb,
2686 aa->oa_oi->oi_lockh, rc);
2688 /* Complete osc stuff. */
2689 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2691 /* Release the lock for async request. */
2692 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2693 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2695 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2696 aa->oa_oi->oi_lockh, req, aa);
2697 LDLM_LOCK_PUT(lock);
2701 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2702 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2703 * other synchronous requests, however keeping some locks and trying to obtain
2704 * others may take a considerable amount of time in a case of ost failure; and
2705 * when other sync requests do not get released lock from a client, the client
2706 * is excluded from the cluster -- such scenarious make the life difficult, so
2707 * release locks just after they are obtained. */
2708 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2709 struct obd_enqueue_info *einfo)
2711 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2712 struct obd_device *obd = exp->exp_obd;
2713 struct ldlm_reply *rep;
2714 struct ptlrpc_request *req = NULL;
2715 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2719 /* Filesystem lock extents are extended to page boundaries so that
2720 * dealing with the page cache is a little smoother. */
2721 oinfo->oi_policy.l_extent.start -=
2722 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2723 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2725 if (oinfo->oi_md->lsm_oinfo->loi_kms_valid == 0)
2728 /* Next, search for already existing extent locks that will cover us */
2729 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2730 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2733 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2736 /* I would like to be able to ASSERT here that rss <=
2737 * kms, but I can't, for reasons which are explained in
2741 /* We already have a lock, and it's referenced */
2742 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2744 /* For async requests, decref the lock. */
2745 if (einfo->ei_rqset)
2746 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2751 /* If we're trying to read, we also search for an existing PW lock. The
2752 * VFS and page cache already protect us locally, so lots of readers/
2753 * writers can share a single PW lock.
2755 * There are problems with conversion deadlocks, so instead of
2756 * converting a read lock to a write lock, we'll just enqueue a new
2759 * At some point we should cancel the read lock instead of making them
2760 * send us a blocking callback, but there are problems with canceling
2761 * locks out from other users right now, too. */
2763 if (einfo->ei_mode == LCK_PR) {
2764 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2765 &res_id, einfo->ei_type, &oinfo->oi_policy,
2766 LCK_PW, oinfo->oi_lockh);
2768 /* FIXME: This is not incredibly elegant, but it might
2769 * be more elegant than adding another parameter to
2770 * lock_match. I want a second opinion. */
2771 /* addref the lock only if not async requests. */
2772 if (!einfo->ei_rqset)
2773 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2774 osc_set_data_with_check(oinfo->oi_lockh,
2777 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2778 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2786 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2787 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
2789 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2790 LDLM_ENQUEUE, 2, size, NULL);
2794 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2795 size[DLM_REPLY_REC_OFF] =
2796 sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb);
2797 ptlrpc_req_set_repsize(req, 3, size);
2800 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2801 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2803 rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2804 &oinfo->oi_policy, einfo->ei_mode,
2805 &einfo->ei_flags, einfo->ei_cb_bl,
2806 einfo->ei_cb_cp, einfo->ei_cb_gl,
2808 &oinfo->oi_md->lsm_oinfo->loi_lvb,
2809 sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb),
2810 lustre_swab_ost_lvb, oinfo->oi_lockh,
2811 einfo->ei_rqset ? 1 : 0);
2812 if (einfo->ei_rqset) {
2814 struct osc_enqueue_args *aa;
2815 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2816 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2821 req->rq_interpret_reply = osc_enqueue_interpret;
2822 ptlrpc_set_add_req(einfo->ei_rqset, req);
2823 } else if (intent) {
2824 ptlrpc_req_finished(req);
2829 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2831 ptlrpc_req_finished(req);
2836 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2837 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2838 int *flags, void *data, struct lustre_handle *lockh)
2840 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2841 struct obd_device *obd = exp->exp_obd;
2843 int lflags = *flags;
2846 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2848 /* Filesystem lock extents are extended to page boundaries so that
2849 * dealing with the page cache is a little smoother */
2850 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2851 policy->l_extent.end |= ~CFS_PAGE_MASK;
2853 /* Next, search for already existing extent locks that will cover us */
2854 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2855 policy, mode, lockh);
2857 //if (!(*flags & LDLM_FL_TEST_LOCK))
2858 osc_set_data_with_check(lockh, data, lflags);
2861 /* If we're trying to read, we also search for an existing PW lock. The
2862 * VFS and page cache already protect us locally, so lots of readers/
2863 * writers can share a single PW lock. */
2864 if (mode == LCK_PR) {
2865 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2867 policy, LCK_PW, lockh);
2868 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2869 /* FIXME: This is not incredibly elegant, but it might
2870 * be more elegant than adding another parameter to
2871 * lock_match. I want a second opinion. */
2872 osc_set_data_with_check(lockh, data, lflags);
2873 ldlm_lock_addref(lockh, LCK_PR);
2874 ldlm_lock_decref(lockh, LCK_PW);
2880 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2881 __u32 mode, struct lustre_handle *lockh)
2885 if (unlikely(mode == LCK_GROUP))
2886 ldlm_lock_decref_and_cancel(lockh, mode);
2888 ldlm_lock_decref(lockh, mode);
2893 static int osc_cancel_unused(struct obd_export *exp,
2894 struct lov_stripe_md *lsm, int flags, void *opaque)
2896 struct obd_device *obd = class_exp2obd(exp);
2897 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2899 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2903 static int osc_join_lru(struct obd_export *exp,
2904 struct lov_stripe_md *lsm, int join)
2906 struct obd_device *obd = class_exp2obd(exp);
2907 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2909 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2912 static int osc_statfs_interpret(struct ptlrpc_request *req,
2913 struct osc_async_args *aa, int rc)
2915 struct obd_statfs *msfs;
2921 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2922 lustre_swab_obd_statfs);
2924 CERROR("Can't unpack obd_statfs\n");
2925 GOTO(out, rc = -EPROTO);
2928 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2930 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2934 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2935 __u64 max_age, struct ptlrpc_request_set *rqset)
2937 struct ptlrpc_request *req;
2938 struct osc_async_args *aa;
2939 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2942 /* We could possibly pass max_age in the request (as an absolute
2943 * timestamp or a "seconds.usec ago") so the target can avoid doing
2944 * extra calls into the filesystem if that isn't necessary (e.g.
2945 * during mount that would help a bit). Having relative timestamps
2946 * is not so great if request processing is slow, while absolute
2947 * timestamps are not ideal because they need time synchronization. */
2948 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2949 OST_STATFS, 1, NULL, NULL);
2953 ptlrpc_req_set_repsize(req, 2, size);
2954 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2956 req->rq_interpret_reply = osc_statfs_interpret;
2957 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2958 aa = (struct osc_async_args *)&req->rq_async_args;
2961 ptlrpc_set_add_req(rqset, req);
2965 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2968 struct obd_statfs *msfs;
2969 struct ptlrpc_request *req;
2970 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2973 /* We could possibly pass max_age in the request (as an absolute
2974 * timestamp or a "seconds.usec ago") so the target can avoid doing
2975 * extra calls into the filesystem if that isn't necessary (e.g.
2976 * during mount that would help a bit). Having relative timestamps
2977 * is not so great if request processing is slow, while absolute
2978 * timestamps are not ideal because they need time synchronization. */
2979 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2980 OST_STATFS, 1, NULL, NULL);
2984 ptlrpc_req_set_repsize(req, 2, size);
2985 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2987 rc = ptlrpc_queue_wait(req);
2991 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2992 lustre_swab_obd_statfs);
2994 CERROR("Can't unpack obd_statfs\n");
2995 GOTO(out, rc = -EPROTO);
2998 memcpy(osfs, msfs, sizeof(*osfs));
3002 ptlrpc_req_finished(req);
3006 /* Retrieve object striping information.
3008 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3009 * the maximum number of OST indices which will fit in the user buffer.
3010 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3012 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3014 struct lov_user_md lum, *lumk;
3015 int rc = 0, lum_size;
3021 if (copy_from_user(&lum, lump, sizeof(lum)))
3024 if (lum.lmm_magic != LOV_USER_MAGIC)
3027 if (lum.lmm_stripe_count > 0) {
3028 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3029 OBD_ALLOC(lumk, lum_size);
3033 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3035 lum_size = sizeof(lum);
3039 lumk->lmm_object_id = lsm->lsm_object_id;
3040 lumk->lmm_stripe_count = 1;
3042 if (copy_to_user(lump, lumk, lum_size))
3046 OBD_FREE(lumk, lum_size);
3052 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3053 void *karg, void *uarg)
3055 struct obd_device *obd = exp->exp_obd;
3056 struct obd_ioctl_data *data = karg;
3060 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3063 if (!try_module_get(THIS_MODULE)) {
3064 CERROR("Can't get module. Is it alive?");
3069 case OBD_IOC_LOV_GET_CONFIG: {
3071 struct lov_desc *desc;
3072 struct obd_uuid uuid;
3076 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3077 GOTO(out, err = -EINVAL);
3079 data = (struct obd_ioctl_data *)buf;
3081 if (sizeof(*desc) > data->ioc_inllen1) {
3082 obd_ioctl_freedata(buf, len);
3083 GOTO(out, err = -EINVAL);
3086 if (data->ioc_inllen2 < sizeof(uuid)) {
3087 obd_ioctl_freedata(buf, len);
3088 GOTO(out, err = -EINVAL);
3091 desc = (struct lov_desc *)data->ioc_inlbuf1;
3092 desc->ld_tgt_count = 1;
3093 desc->ld_active_tgt_count = 1;
3094 desc->ld_default_stripe_count = 1;
3095 desc->ld_default_stripe_size = 0;
3096 desc->ld_default_stripe_offset = 0;
3097 desc->ld_pattern = 0;
3098 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3100 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3102 err = copy_to_user((void *)uarg, buf, len);
3105 obd_ioctl_freedata(buf, len);
3108 case LL_IOC_LOV_SETSTRIPE:
3109 err = obd_alloc_memmd(exp, karg);
3113 case LL_IOC_LOV_GETSTRIPE:
3114 err = osc_getstripe(karg, uarg);
3116 case OBD_IOC_CLIENT_RECOVER:
3117 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3122 case IOC_OSC_SET_ACTIVE:
3123 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3126 case OBD_IOC_POLL_QUOTACHECK:
3127 err = lquota_poll_check(quota_interface, exp,
3128 (struct if_quotacheck *)karg);
3131 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3132 cmd, cfs_curproc_comm());
3133 GOTO(out, err = -ENOTTY);
3136 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3139 module_put(THIS_MODULE);
3144 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3145 void *key, __u32 *vallen, void *val)
3148 if (!vallen || !val)
3151 if (keylen > strlen("lock_to_stripe") &&
3152 strcmp(key, "lock_to_stripe") == 0) {
3153 __u32 *stripe = val;
3154 *vallen = sizeof(*stripe);
3157 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3158 struct ptlrpc_request *req;
3160 char *bufs[2] = { NULL, key };
3161 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3163 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3164 OST_GET_INFO, 2, size, bufs);
3168 size[REPLY_REC_OFF] = *vallen;
3169 ptlrpc_req_set_repsize(req, 2, size);
3170 rc = ptlrpc_queue_wait(req);
3174 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3175 lustre_swab_ost_last_id);
3176 if (reply == NULL) {
3177 CERROR("Can't unpack OST last ID\n");
3178 GOTO(out, rc = -EPROTO);
3180 *((obd_id *)val) = *reply;
3182 ptlrpc_req_finished(req);
3188 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3191 struct llog_ctxt *ctxt;
3192 struct obd_import *imp = req->rq_import;
3198 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3201 rc = llog_initiator_connect(ctxt);
3203 CERROR("cannot establish connection for "
3204 "ctxt %p: %d\n", ctxt, rc);
3207 imp->imp_server_timeout = 1;
3208 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3209 imp->imp_pingable = 1;
3214 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3215 void *key, obd_count vallen, void *val,
3216 struct ptlrpc_request_set *set)
3218 struct ptlrpc_request *req;
3219 struct obd_device *obd = exp->exp_obd;
3220 struct obd_import *imp = class_exp2cliimp(exp);
3221 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3222 char *bufs[3] = { NULL, key, val };
3225 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3227 if (KEY_IS(KEY_NEXT_ID)) {
3228 if (vallen != sizeof(obd_id))
3230 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3231 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3232 exp->exp_obd->obd_name,
3233 obd->u.cli.cl_oscc.oscc_next_id);
3238 if (KEY_IS("unlinked")) {
3239 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3240 spin_lock(&oscc->oscc_lock);
3241 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3242 spin_unlock(&oscc->oscc_lock);
3246 if (KEY_IS(KEY_INIT_RECOV)) {
3247 if (vallen != sizeof(int))
3249 imp->imp_initial_recov = *(int *)val;
3250 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3251 exp->exp_obd->obd_name,
3252 imp->imp_initial_recov);
3256 if (KEY_IS("checksum")) {
3257 if (vallen != sizeof(int))
3259 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3266 /* We pass all other commands directly to OST. Since nobody calls osc
3267 methods directly and everybody is supposed to go through LOV, we
3268 assume lov checked invalid values for us.
3269 The only recognised values so far are evict_by_nid and mds_conn.
3270 Even if something bad goes through, we'd get a -EINVAL from OST
3273 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3278 if (KEY_IS("mds_conn"))
3279 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3281 ptlrpc_req_set_repsize(req, 1, NULL);
3282 ptlrpc_set_add_req(set, req);
3283 ptlrpc_check_set(set);
3289 static struct llog_operations osc_size_repl_logops = {
3290 lop_cancel: llog_obd_repl_cancel
3293 static struct llog_operations osc_mds_ost_orig_logops;
3294 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3295 int count, struct llog_catid *catid,
3296 struct obd_uuid *uuid)
3301 spin_lock(&obd->obd_dev_lock);
3302 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3303 osc_mds_ost_orig_logops = llog_lvfs_ops;
3304 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3305 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3306 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3307 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3309 spin_unlock(&obd->obd_dev_lock);
3311 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3312 &catid->lci_logid, &osc_mds_ost_orig_logops);
3314 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3318 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3319 &osc_size_repl_logops);
3321 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3324 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3325 obd->obd_name, tgt->obd_name, count, catid, rc);
3326 CERROR("logid "LPX64":0x%x\n",
3327 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3332 static int osc_llog_finish(struct obd_device *obd, int count)
3334 struct llog_ctxt *ctxt;
3335 int rc = 0, rc2 = 0;
3338 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3340 rc = llog_cleanup(ctxt);
3342 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3344 rc2 = llog_cleanup(ctxt);
3351 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3352 struct obd_uuid *cluuid,
3353 struct obd_connect_data *data)
3355 struct client_obd *cli = &obd->u.cli;
3357 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3360 client_obd_list_lock(&cli->cl_loi_list_lock);
3361 data->ocd_grant = cli->cl_avail_grant ?:
3362 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3363 lost_grant = cli->cl_lost_grant;
3364 cli->cl_lost_grant = 0;
3365 client_obd_list_unlock(&cli->cl_loi_list_lock);
3367 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3368 "cl_lost_grant: %ld\n", data->ocd_grant,
3369 cli->cl_avail_grant, lost_grant);
3370 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3371 " ocd_grant: %d\n", data->ocd_connect_flags,
3372 data->ocd_version, data->ocd_grant);
3378 static int osc_disconnect(struct obd_export *exp)
3380 struct obd_device *obd = class_exp2obd(exp);
3381 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3384 if (obd->u.cli.cl_conn_count == 1)
3385 /* flush any remaining cancel messages out to the target */
3386 llog_sync(ctxt, exp);
3388 rc = client_disconnect_export(exp);
3392 static int osc_import_event(struct obd_device *obd,
3393 struct obd_import *imp,
3394 enum obd_import_event event)
3396 struct client_obd *cli;
3400 LASSERT(imp->imp_obd == obd);
3403 case IMP_EVENT_DISCON: {
3404 /* Only do this on the MDS OSC's */
3405 if (imp->imp_server_timeout) {
3406 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3408 spin_lock(&oscc->oscc_lock);
3409 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3410 spin_unlock(&oscc->oscc_lock);
3415 case IMP_EVENT_INACTIVE: {
3416 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3419 case IMP_EVENT_INVALIDATE: {
3420 struct ldlm_namespace *ns = obd->obd_namespace;
3424 client_obd_list_lock(&cli->cl_loi_list_lock);
3425 cli->cl_avail_grant = 0;
3426 cli->cl_lost_grant = 0;
3427 /* all pages go to failing rpcs due to the invalid import */
3428 osc_check_rpcs(cli);
3429 client_obd_list_unlock(&cli->cl_loi_list_lock);
3431 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3435 case IMP_EVENT_ACTIVE: {
3436 /* Only do this on the MDS OSC's */
3437 if (imp->imp_server_timeout) {
3438 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3440 spin_lock(&oscc->oscc_lock);
3441 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3442 spin_unlock(&oscc->oscc_lock);
3444 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3447 case IMP_EVENT_OCD: {
3448 struct obd_connect_data *ocd = &imp->imp_connect_data;
3450 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3451 osc_init_grant(&obd->u.cli, ocd);
3454 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3455 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3457 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3461 CERROR("Unknown import event %d\n", event);
3467 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3473 rc = ptlrpcd_addref();
3477 rc = client_obd_setup(obd, len, buf);
3481 struct lprocfs_static_vars lvars;
3482 struct client_obd *cli = &obd->u.cli;
3484 lprocfs_init_vars(osc, &lvars);
3485 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3486 lproc_osc_attach_seqstat(obd);
3487 ptlrpc_lprocfs_register_obd(obd);
3491 /* We need to allocate a few requests more, because
3492 brw_interpret_oap tries to create new requests before freeing
3493 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3494 reserved, but I afraid that might be too much wasted RAM
3495 in fact, so 2 is just my guess and still should work. */
3496 cli->cl_import->imp_rq_pool =
3497 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3499 ptlrpc_add_rqs_to_pool);
3505 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3511 case OBD_CLEANUP_EARLY: {
3512 struct obd_import *imp;
3513 imp = obd->u.cli.cl_import;
3514 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3515 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3516 ptlrpc_deactivate_import(imp);
3519 case OBD_CLEANUP_EXPORTS: {
3520 /* If we set up but never connected, the
3521 client import will not have been cleaned. */
3522 if (obd->u.cli.cl_import) {
3523 struct obd_import *imp;
3524 imp = obd->u.cli.cl_import;
3525 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3527 ptlrpc_invalidate_import(imp);
3528 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3529 class_destroy_import(imp);
3530 obd->u.cli.cl_import = NULL;
3534 case OBD_CLEANUP_SELF_EXP:
3535 rc = obd_llog_finish(obd, 0);
3537 CERROR("failed to cleanup llogging subsystems\n");
3539 case OBD_CLEANUP_OBD:
3545 int osc_cleanup(struct obd_device *obd)
3547 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3551 ptlrpc_lprocfs_unregister_obd(obd);
3552 lprocfs_obd_cleanup(obd);
3554 spin_lock(&oscc->oscc_lock);
3555 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3556 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3557 spin_unlock(&oscc->oscc_lock);
3559 /* free memory of osc quota cache */
3560 lquota_cleanup(quota_interface, obd);
3562 rc = client_obd_cleanup(obd);
3568 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3570 struct lustre_cfg *lcfg = buf;
3571 struct lprocfs_static_vars lvars;
3574 lprocfs_init_vars(osc, &lvars);
3576 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3580 struct obd_ops osc_obd_ops = {
3581 .o_owner = THIS_MODULE,
3582 .o_setup = osc_setup,
3583 .o_precleanup = osc_precleanup,
3584 .o_cleanup = osc_cleanup,
3585 .o_add_conn = client_import_add_conn,
3586 .o_del_conn = client_import_del_conn,
3587 .o_connect = client_connect_import,
3588 .o_reconnect = osc_reconnect,
3589 .o_disconnect = osc_disconnect,
3590 .o_statfs = osc_statfs,
3591 .o_statfs_async = osc_statfs_async,
3592 .o_packmd = osc_packmd,
3593 .o_unpackmd = osc_unpackmd,
3594 .o_create = osc_create,
3595 .o_destroy = osc_destroy,
3596 .o_getattr = osc_getattr,
3597 .o_getattr_async = osc_getattr_async,
3598 .o_setattr = osc_setattr,
3599 .o_setattr_async = osc_setattr_async,
3601 .o_brw_async = osc_brw_async,
3602 .o_prep_async_page = osc_prep_async_page,
3603 .o_queue_async_io = osc_queue_async_io,
3604 .o_set_async_flags = osc_set_async_flags,
3605 .o_queue_group_io = osc_queue_group_io,
3606 .o_trigger_group_io = osc_trigger_group_io,
3607 .o_teardown_async_page = osc_teardown_async_page,
3608 .o_punch = osc_punch,
3610 .o_enqueue = osc_enqueue,
3611 .o_match = osc_match,
3612 .o_change_cbdata = osc_change_cbdata,
3613 .o_cancel = osc_cancel,
3614 .o_cancel_unused = osc_cancel_unused,
3615 .o_join_lru = osc_join_lru,
3616 .o_iocontrol = osc_iocontrol,
3617 .o_get_info = osc_get_info,
3618 .o_set_info_async = osc_set_info_async,
3619 .o_import_event = osc_import_event,
3620 .o_llog_init = osc_llog_init,
3621 .o_llog_finish = osc_llog_finish,
3622 .o_process_config = osc_process_config,
3625 int __init osc_init(void)
3627 struct lprocfs_static_vars lvars;
3631 lprocfs_init_vars(osc, &lvars);
3633 request_module("lquota");
3634 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3635 lquota_init(quota_interface);
3636 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3638 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3641 if (quota_interface)
3642 PORTAL_SYMBOL_PUT(osc_quota_interface);
3650 static void /*__exit*/ osc_exit(void)
3652 lquota_exit(quota_interface);
3653 if (quota_interface)
3654 PORTAL_SYMBOL_PUT(osc_quota_interface);
3656 class_unregister_type(LUSTRE_OSC_NAME);
3659 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3660 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3661 MODULE_LICENSE("GPL");
3663 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);