1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
70 atomic_t osc_resend_time;
72 /* Pack OSC object metadata for disk storage (LE byte order). */
73 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
74 struct lov_stripe_md *lsm)
79 lmm_size = sizeof(**lmmp);
84 OBD_FREE(*lmmp, lmm_size);
90 OBD_ALLOC(*lmmp, lmm_size);
96 LASSERT(lsm->lsm_object_id);
97 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105 struct lov_mds_md *lmm, int lmm_bytes)
111 if (lmm_bytes < sizeof (*lmm)) {
112 CERROR("lov_mds_md too small: %d, need %d\n",
113 lmm_bytes, (int)sizeof(*lmm));
116 /* XXX LOV_MAGIC etc check? */
118 if (lmm->lmm_object_id == 0) {
119 CERROR("lov_mds_md: zero lmm_object_id\n");
124 lsm_size = lov_stripe_md_size(1);
128 if (*lsmp != NULL && lmm == NULL) {
129 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130 OBD_FREE(*lsmp, lsm_size);
136 OBD_ALLOC(*lsmp, lsm_size);
139 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141 OBD_FREE(*lsmp, lsm_size);
144 loi_init((*lsmp)->lsm_oinfo[0]);
148 /* XXX zero *lsmp? */
149 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150 LASSERT((*lsmp)->lsm_object_id);
153 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
158 static int osc_getattr_interpret(struct ptlrpc_request *req,
159 struct osc_async_args *aa, int rc)
161 struct ost_body *body;
167 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
168 lustre_swab_ost_body);
170 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
173 /* This should really be sent by the OST */
174 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
175 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
177 CERROR("can't unpack ost_body\n");
179 aa->aa_oi->oi_oa->o_valid = 0;
182 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
186 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
187 struct ptlrpc_request_set *set)
189 struct ptlrpc_request *req;
190 struct ost_body *body;
191 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
192 struct osc_async_args *aa;
195 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
196 OST_GETATTR, 2, size,NULL);
200 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
201 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
203 ptlrpc_req_set_repsize(req, 2, size);
204 req->rq_interpret_reply = osc_getattr_interpret;
206 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
207 aa = (struct osc_async_args *)&req->rq_async_args;
210 ptlrpc_set_add_req(set, req);
214 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
216 struct ptlrpc_request *req;
217 struct ost_body *body;
218 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
221 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
222 OST_GETATTR, 2, size, NULL);
226 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
227 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
229 ptlrpc_req_set_repsize(req, 2, size);
231 rc = ptlrpc_queue_wait(req);
233 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
237 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
238 lustre_swab_ost_body);
240 CERROR ("can't unpack ost_body\n");
241 GOTO (out, rc = -EPROTO);
244 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
247 /* This should really be sent by the OST */
248 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
249 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
253 ptlrpc_req_finished(req);
257 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
258 struct obd_trans_info *oti)
260 struct ptlrpc_request *req;
261 struct ost_body *body;
262 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
265 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
266 OST_SETATTR, 2, size, NULL);
270 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
271 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
273 ptlrpc_req_set_repsize(req, 2, size);
275 rc = ptlrpc_queue_wait(req);
279 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
280 lustre_swab_ost_body);
282 GOTO(out, rc = -EPROTO);
284 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
288 ptlrpc_req_finished(req);
292 static int osc_setattr_interpret(struct ptlrpc_request *req,
293 struct osc_async_args *aa, int rc)
295 struct ost_body *body;
301 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
302 lustre_swab_ost_body);
304 CERROR("can't unpack ost_body\n");
305 GOTO(out, rc = -EPROTO);
308 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
310 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
314 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
315 struct obd_trans_info *oti,
316 struct ptlrpc_request_set *rqset)
318 struct ptlrpc_request *req;
319 struct ost_body *body;
320 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
321 struct osc_async_args *aa;
324 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
325 OST_SETATTR, 2, size, NULL);
329 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
331 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
333 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
334 sizeof(*oti->oti_logcookies));
337 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
338 ptlrpc_req_set_repsize(req, 2, size);
339 /* do mds to ost setattr asynchronouly */
341 /* Do not wait for response. */
342 ptlrpcd_add_req(req);
344 req->rq_interpret_reply = osc_setattr_interpret;
346 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
347 aa = (struct osc_async_args *)&req->rq_async_args;
350 ptlrpc_set_add_req(rqset, req);
356 int osc_real_create(struct obd_export *exp, struct obdo *oa,
357 struct lov_stripe_md **ea, struct obd_trans_info *oti)
359 struct ptlrpc_request *req;
360 struct ost_body *body;
361 struct lov_stripe_md *lsm;
362 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
370 rc = obd_alloc_memmd(exp, &lsm);
375 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
376 OST_CREATE, 2, size, NULL);
378 GOTO(out, rc = -ENOMEM);
380 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
381 memcpy(&body->oa, oa, sizeof(body->oa));
383 ptlrpc_req_set_repsize(req, 2, size);
384 if (oa->o_valid & OBD_MD_FLINLINE) {
385 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
386 oa->o_flags == OBD_FL_DELORPHAN);
388 "delorphan from OST integration");
389 /* Don't resend the delorphan req */
390 req->rq_no_resend = req->rq_no_delay = 1;
393 rc = ptlrpc_queue_wait(req);
397 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398 lustre_swab_ost_body);
400 CERROR ("can't unpack ost_body\n");
401 GOTO (out_req, rc = -EPROTO);
404 memcpy(oa, &body->oa, sizeof(*oa));
406 /* This should really be sent by the OST */
407 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408 oa->o_valid |= OBD_MD_FLBLKSZ;
410 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411 * have valid lsm_oinfo data structs, so don't go touching that.
412 * This needs to be fixed in a big way.
414 lsm->lsm_object_id = oa->o_id;
418 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
420 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421 if (!oti->oti_logcookies)
422 oti_alloc_cookies(oti, 1);
423 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
424 sizeof(oti->oti_onecookie));
428 CDEBUG(D_HA, "transno: "LPD64"\n",
429 lustre_msg_get_transno(req->rq_repmsg));
431 ptlrpc_req_finished(req);
434 obd_free_memmd(exp, &lsm);
438 static int osc_punch_interpret(struct ptlrpc_request *req,
439 struct osc_async_args *aa, int rc)
441 struct ost_body *body;
447 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
448 lustre_swab_ost_body);
450 CERROR ("can't unpack ost_body\n");
451 GOTO(out, rc = -EPROTO);
454 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
456 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
460 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
461 struct obd_trans_info *oti,
462 struct ptlrpc_request_set *rqset)
464 struct ptlrpc_request *req;
465 struct osc_async_args *aa;
466 struct ost_body *body;
467 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
475 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
476 OST_PUNCH, 2, size, NULL);
480 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
482 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
485 /* overload the size and blocks fields in the oa with start/end */
486 body->oa.o_size = oinfo->oi_policy.l_extent.start;
487 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
490 ptlrpc_req_set_repsize(req, 2, size);
492 req->rq_interpret_reply = osc_punch_interpret;
493 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494 aa = (struct osc_async_args *)&req->rq_async_args;
496 ptlrpc_set_add_req(rqset, req);
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502 struct lov_stripe_md *md, obd_size start, obd_size end)
504 struct ptlrpc_request *req;
505 struct ost_body *body;
506 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
514 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515 OST_SYNC, 2, size, NULL);
519 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520 memcpy(&body->oa, oa, sizeof(*oa));
522 /* overload the size and blocks fields in the oa with start/end */
523 body->oa.o_size = start;
524 body->oa.o_blocks = end;
525 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
527 ptlrpc_req_set_repsize(req, 2, size);
529 rc = ptlrpc_queue_wait(req);
533 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534 lustre_swab_ost_body);
536 CERROR ("can't unpack ost_body\n");
537 GOTO (out, rc = -EPROTO);
540 memcpy(oa, &body->oa, sizeof(*oa));
544 ptlrpc_req_finished(req);
548 /* Find and cancel locally locks matched by @mode in the resource found by
549 * @objid. Found locks are added into @cancel list. Returns the amount of
550 * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552 struct list_head *cancels, ldlm_mode_t mode,
555 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556 struct ldlm_res_id res_id = { .name = { objid } };
557 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
564 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565 lock_flags, 0, NULL);
566 ldlm_resource_putref(res);
570 /* Destroy requests can be async always on the client, and we don't even really
571 * care about the return code since the client cannot do anything at all about
573 * When the MDS is unlinking a filename, it saves the file objects into a
574 * recovery llog, and these object records are cancelled when the OST reports
575 * they were destroyed and sync'd to disk (i.e. transaction committed).
576 * If the client dies, or the OST is down when the object should be destroyed,
577 * the records are not cancelled, and when the OST reconnects to the MDS next,
578 * it will retrieve the llog unlink logs and then sends the log cancellation
579 * cookies to the MDS after committing destroy transactions. */
580 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *ea, struct obd_trans_info *oti,
582 struct obd_export *md_export)
584 CFS_LIST_HEAD(cancels);
585 struct ptlrpc_request *req;
586 struct ost_body *body;
587 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
588 int count, bufcount = 2;
596 count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
597 LDLM_FL_DISCARD_DATA);
598 if (exp_connect_cancelset(exp) && count) {
600 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
602 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603 OST_DESTROY, bufcount, size, NULL);
604 if (exp_connect_cancelset(exp) && req)
605 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1);
607 ldlm_lock_list_put(&cancels, l_bl_ast, count);
612 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
614 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
616 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
617 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
618 sizeof(*oti->oti_logcookies));
621 memcpy(&body->oa, oa, sizeof(*oa));
622 ptlrpc_req_set_repsize(req, 2, size);
624 ptlrpcd_add_req(req);
628 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
631 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
633 LASSERT(!(oa->o_valid & bits));
636 client_obd_list_lock(&cli->cl_loi_list_lock);
637 oa->o_dirty = cli->cl_dirty;
638 if (cli->cl_dirty > cli->cl_dirty_max) {
639 CERROR("dirty %lu > dirty_max %lu\n",
640 cli->cl_dirty, cli->cl_dirty_max);
642 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
643 CERROR("dirty %d > system dirty_max %d\n",
644 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
646 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
647 CERROR("dirty %lu - dirty_max %lu too big???\n",
648 cli->cl_dirty, cli->cl_dirty_max);
651 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
652 (cli->cl_max_rpcs_in_flight + 1);
653 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
655 oa->o_grant = cli->cl_avail_grant;
656 oa->o_dropped = cli->cl_lost_grant;
657 cli->cl_lost_grant = 0;
658 client_obd_list_unlock(&cli->cl_loi_list_lock);
659 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
660 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
663 /* caller must hold loi_list_lock */
664 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
666 atomic_inc(&obd_dirty_pages);
667 cli->cl_dirty += CFS_PAGE_SIZE;
668 cli->cl_avail_grant -= CFS_PAGE_SIZE;
669 pga->flag |= OBD_BRW_FROM_GRANT;
670 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
671 CFS_PAGE_SIZE, pga, pga->pg);
672 LASSERT(cli->cl_avail_grant >= 0);
675 /* the companion to osc_consume_write_grant, called when a brw has completed.
676 * must be called with the loi lock held. */
677 static void osc_release_write_grant(struct client_obd *cli,
678 struct brw_page *pga, int sent)
680 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
683 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
688 pga->flag &= ~OBD_BRW_FROM_GRANT;
689 atomic_dec(&obd_dirty_pages);
690 cli->cl_dirty -= CFS_PAGE_SIZE;
692 cli->cl_lost_grant += CFS_PAGE_SIZE;
693 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
694 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
695 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
696 /* For short writes we shouldn't count parts of pages that
697 * span a whole block on the OST side, or our accounting goes
698 * wrong. Should match the code in filter_grant_check. */
699 int offset = pga->off & ~CFS_PAGE_MASK;
700 int count = pga->count + (offset & (blocksize - 1));
701 int end = (offset + pga->count) & (blocksize - 1);
703 count += blocksize - end;
705 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
706 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
707 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
708 cli->cl_avail_grant, cli->cl_dirty);
714 static unsigned long rpcs_in_flight(struct client_obd *cli)
716 return cli->cl_r_in_flight + cli->cl_w_in_flight;
719 /* caller must hold loi_list_lock */
720 void osc_wake_cache_waiters(struct client_obd *cli)
722 struct list_head *l, *tmp;
723 struct osc_cache_waiter *ocw;
726 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
727 /* if we can't dirty more, we must wait until some is written */
728 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
729 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
730 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
731 "osc max %ld, sys max %d\n", cli->cl_dirty,
732 cli->cl_dirty_max, obd_max_dirty_pages);
736 /* if still dirty cache but no grant wait for pending RPCs that
737 * may yet return us some grant before doing sync writes */
738 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
739 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
740 cli->cl_w_in_flight);
744 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
745 list_del_init(&ocw->ocw_entry);
746 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
747 /* no more RPCs in flight to return grant, do sync IO */
748 ocw->ocw_rc = -EDQUOT;
749 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
751 osc_consume_write_grant(cli,
752 &ocw->ocw_oap->oap_brw_page);
755 cfs_waitq_signal(&ocw->ocw_waitq);
761 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
763 client_obd_list_lock(&cli->cl_loi_list_lock);
764 cli->cl_avail_grant = ocd->ocd_grant;
765 client_obd_list_unlock(&cli->cl_loi_list_lock);
767 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
768 cli->cl_avail_grant, cli->cl_lost_grant);
769 LASSERT(cli->cl_avail_grant >= 0);
772 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 client_obd_list_lock(&cli->cl_loi_list_lock);
775 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
776 if (body->oa.o_valid & OBD_MD_FLGRANT)
777 cli->cl_avail_grant += body->oa.o_grant;
778 /* waiters are woken in brw_interpret_oap */
779 client_obd_list_unlock(&cli->cl_loi_list_lock);
782 /* We assume that the reason this OSC got a short read is because it read
783 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
784 * via the LOV, and it _knows_ it's reading inside the file, it's just that
785 * this stripe never got written at or beyond this stripe offset yet. */
786 static void handle_short_read(int nob_read, obd_count page_count,
787 struct brw_page **pga)
792 /* skip bytes read OK */
793 while (nob_read > 0) {
794 LASSERT (page_count > 0);
796 if (pga[i]->count > nob_read) {
797 /* EOF inside this page */
798 ptr = cfs_kmap(pga[i]->pg) +
799 (pga[i]->off & ~CFS_PAGE_MASK);
800 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
801 cfs_kunmap(pga[i]->pg);
807 nob_read -= pga[i]->count;
812 /* zero remaining pages */
813 while (page_count-- > 0) {
814 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
815 memset(ptr, 0, pga[i]->count);
816 cfs_kunmap(pga[i]->pg);
821 static int check_write_rcs(struct ptlrpc_request *req,
822 int requested_nob, int niocount,
823 obd_count page_count, struct brw_page **pga)
827 /* return error if any niobuf was in error */
828 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
829 sizeof(*remote_rcs) * niocount, NULL);
830 if (remote_rcs == NULL) {
831 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
834 if (lustre_msg_swabbed(req->rq_repmsg))
835 for (i = 0; i < niocount; i++)
836 __swab32s(&remote_rcs[i]);
838 for (i = 0; i < niocount; i++) {
839 if (remote_rcs[i] < 0)
840 return(remote_rcs[i]);
842 if (remote_rcs[i] != 0) {
843 CERROR("rc[%d] invalid (%d) req %p\n",
844 i, remote_rcs[i], req);
849 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
850 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
851 requested_nob, req->rq_bulk->bd_nob_transferred);
858 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
860 if (p1->flag != p2->flag) {
861 unsigned mask = ~OBD_BRW_FROM_GRANT;
863 /* warn if we try to combine flags that we don't know to be
865 if ((p1->flag & mask) != (p2->flag & mask))
866 CERROR("is it ok to have flags 0x%x and 0x%x in the "
867 "same brw?\n", p1->flag, p2->flag);
871 return (p1->off + p1->count == p2->off);
874 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
875 struct brw_page **pga, int opc)
880 LASSERT (pg_count > 0);
881 while (nob > 0 && pg_count > 0) {
882 char *ptr = cfs_kmap(pga[i]->pg);
883 int off = pga[i]->off & ~CFS_PAGE_MASK;
884 int count = pga[i]->count > nob ? nob : pga[i]->count;
886 /* corrupt the data before we compute the checksum, to
887 * simulate an OST->client data error */
888 if (i == 0 && opc == OST_READ &&
889 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
890 memcpy(ptr + off, "bad1", min(4, nob));
891 cksum = crc32_le(cksum, ptr + off, count);
892 cfs_kunmap(pga[i]->pg);
893 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
896 nob -= pga[i]->count;
900 /* For sending we only compute the wrong checksum instead
901 * of corrupting the data so it is still correct on a redo */
902 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
908 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
909 struct lov_stripe_md *lsm, obd_count page_count,
910 struct brw_page **pga,
911 struct ptlrpc_request **reqp)
913 struct ptlrpc_request *req;
914 struct ptlrpc_bulk_desc *desc;
915 struct ost_body *body;
916 struct obd_ioobj *ioobj;
917 struct niobuf_remote *niobuf;
918 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
919 int niocount, i, requested_nob, opc, rc;
920 struct ptlrpc_request_pool *pool;
921 struct osc_brw_async_args *aa;
922 struct brw_page *pg_prev;
925 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
926 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
928 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
929 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
931 for (niocount = i = 1; i < page_count; i++) {
932 if (!can_merge_pages(pga[i - 1], pga[i]))
936 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
937 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
939 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
944 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
946 if (opc == OST_WRITE)
947 desc = ptlrpc_prep_bulk_imp (req, page_count,
948 BULK_GET_SOURCE, OST_BULK_PORTAL);
950 desc = ptlrpc_prep_bulk_imp (req, page_count,
951 BULK_PUT_SINK, OST_BULK_PORTAL);
953 GOTO(out, rc = -ENOMEM);
954 /* NB request now owns desc and will free it when it gets freed */
956 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
957 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
958 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
959 niocount * sizeof(*niobuf));
961 memcpy(&body->oa, oa, sizeof(*oa));
963 obdo_to_ioobj(oa, ioobj);
964 ioobj->ioo_bufcnt = niocount;
966 LASSERT (page_count > 0);
968 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
969 struct brw_page *pg = pga[i];
971 LASSERT(pg->count > 0);
972 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
973 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
976 LASSERTF(i == 0 || pg->off > pg_prev->off,
977 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
978 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
980 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
981 pg_prev->pg, page_private(pg_prev->pg),
982 pg_prev->pg->index, pg_prev->off);
984 LASSERTF(i == 0 || pg->off > pg_prev->off,
985 "i %d p_c %u\n", i, page_count);
987 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
988 (pg->flag & OBD_BRW_SRVLOCK));
990 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
992 requested_nob += pg->count;
994 if (i > 0 && can_merge_pages(pg_prev, pg)) {
996 niobuf->len += pg->count;
998 niobuf->offset = pg->off;
999 niobuf->len = pg->count;
1000 niobuf->flags = pg->flag;
1005 LASSERTF((void *)(niobuf - niocount) ==
1006 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1007 niocount * sizeof(*niobuf)),
1008 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1009 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1010 (void *)(niobuf - niocount));
1012 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1014 /* size[REQ_REC_OFF] still sizeof (*body) */
1015 if (opc == OST_WRITE) {
1016 if (unlikely(cli->cl_checksum)) {
1017 body->oa.o_valid |= OBD_MD_FLCKSUM;
1018 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1021 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1023 /* save this in 'oa', too, for later checking */
1024 oa->o_valid |= OBD_MD_FLCKSUM;
1026 /* clear out the checksum flag, in case this is a
1027 * resend but cl_checksum is no longer set. b=11238 */
1028 oa->o_valid &= ~OBD_MD_FLCKSUM;
1030 oa->o_cksum = body->oa.o_cksum;
1031 /* 1 RC per niobuf */
1032 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1033 ptlrpc_req_set_repsize(req, 3, size);
1035 if (unlikely(cli->cl_checksum))
1036 body->oa.o_valid |= OBD_MD_FLCKSUM;
1037 /* 1 RC for the whole I/O */
1038 ptlrpc_req_set_repsize(req, 2, size);
1041 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1042 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1044 aa->aa_requested_nob = requested_nob;
1045 aa->aa_nio_count = niocount;
1046 aa->aa_page_count = page_count;
1050 INIT_LIST_HEAD(&aa->aa_oaps);
1056 ptlrpc_req_finished (req);
1060 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1061 __u32 client_cksum, __u32 server_cksum, int nob,
1062 obd_count page_count, struct brw_page **pga)
1067 if (server_cksum == client_cksum) {
1068 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1072 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1074 if (new_cksum == server_cksum)
1075 msg = "changed on the client after we checksummed it - "
1076 "likely false positive due to mmap IO (bug 11742)";
1077 else if (new_cksum == client_cksum)
1078 msg = "changed in transit before arrival at OST";
1080 msg = "changed in transit AND doesn't match the original - "
1081 "likely false positive due to mmap IO (bug 11742)";
1083 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1084 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1085 "["LPU64"-"LPU64"]\n",
1086 msg, libcfs_nid2str(peer->nid),
1087 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1088 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1091 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1093 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1094 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1095 client_cksum, server_cksum, new_cksum);
1100 /* Note rc enters this function as number of bytes transferred */
1101 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1103 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1104 const lnet_process_id_t *peer =
1105 &req->rq_import->imp_connection->c_peer;
1106 struct client_obd *cli = aa->aa_cli;
1107 struct ost_body *body;
1108 __u32 client_cksum = 0;
1111 if (rc < 0 && rc != -EDQUOT)
1114 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1115 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1116 lustre_swab_ost_body);
1118 CERROR ("Can't unpack body\n");
1122 /* set/clear over quota flag for a uid/gid */
1123 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1124 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1125 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1126 body->oa.o_gid, body->oa.o_valid,
1132 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1133 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1135 osc_update_grant(cli, body);
1137 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1139 CERROR ("Unexpected +ve rc %d\n", rc);
1142 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1144 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1146 check_write_checksum(&body->oa, peer, client_cksum,
1148 aa->aa_requested_nob,
1153 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1154 aa->aa_page_count, aa->aa_ppga);
1158 /* The rest of this function executes only for OST_READs */
1159 if (rc > aa->aa_requested_nob) {
1160 CERROR("Unexpected rc %d (%d requested)\n", rc,
1161 aa->aa_requested_nob);
1165 if (rc != req->rq_bulk->bd_nob_transferred) {
1166 CERROR ("Unexpected rc %d (%d transferred)\n",
1167 rc, req->rq_bulk->bd_nob_transferred);
1171 if (rc < aa->aa_requested_nob)
1172 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1174 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1175 static int cksum_counter;
1176 __u32 server_cksum = body->oa.o_cksum;
1180 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1181 aa->aa_ppga, OST_READ);
1183 if (peer->nid == req->rq_bulk->bd_sender) {
1187 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1190 if (server_cksum == ~0 && rc > 0) {
1191 CERROR("Protocol error: server %s set the 'checksum' "
1192 "bit, but didn't send a checksum. Not fatal, "
1193 "but please tell CFS.\n",
1194 libcfs_nid2str(peer->nid));
1195 } else if (server_cksum != client_cksum) {
1196 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1197 "%s%s%s inum "LPU64"/"LPU64" object "
1198 LPU64"/"LPU64" extent "
1199 "["LPU64"-"LPU64"]\n",
1200 req->rq_import->imp_obd->obd_name,
1201 libcfs_nid2str(peer->nid),
1203 body->oa.o_valid & OBD_MD_FLFID ?
1204 body->oa.o_fid : (__u64)0,
1205 body->oa.o_valid & OBD_MD_FLFID ?
1206 body->oa.o_generation :(__u64)0,
1208 body->oa.o_valid & OBD_MD_FLGROUP ?
1209 body->oa.o_gr : (__u64)0,
1210 aa->aa_ppga[0]->off,
1211 aa->aa_ppga[aa->aa_page_count-1]->off +
1212 aa->aa_ppga[aa->aa_page_count-1]->count -
1214 CERROR("client %x, server %x\n",
1215 client_cksum, server_cksum);
1217 aa->aa_oa->o_cksum = client_cksum;
1221 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1224 } else if (unlikely(client_cksum)) {
1225 static int cksum_missed;
1228 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1229 CERROR("Checksum %u requested from %s but not sent\n",
1230 cksum_missed, libcfs_nid2str(peer->nid));
1236 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1241 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1242 struct lov_stripe_md *lsm,
1243 obd_count page_count, struct brw_page **pga)
1245 struct ptlrpc_request *request;
1249 struct l_wait_info lwi;
1252 init_waitqueue_head(&waitq);
1255 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1256 page_count, pga, &request);
1260 rc = ptlrpc_queue_wait(request);
1262 if (rc == -ETIMEDOUT && request->rq_resend) {
1263 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1264 ptlrpc_req_finished(request);
1268 rc = osc_brw_fini_request(request, rc);
1270 ptlrpc_req_finished(request);
1271 if (osc_recoverable_error(rc)) {
1273 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1274 CERROR("too many resend retries, returning error\n");
1278 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1279 l_wait_event(waitq, 0, &lwi);
1286 int osc_brw_redo_request(struct ptlrpc_request *request,
1287 struct osc_brw_async_args *aa)
1289 struct ptlrpc_request *new_req;
1290 struct ptlrpc_request_set *set = request->rq_set;
1291 struct osc_brw_async_args *new_aa;
1292 struct osc_async_page *oap;
1296 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1297 CERROR("too many resend retries, returning error\n");
1301 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1303 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1304 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1305 aa->aa_cli, aa->aa_oa,
1306 NULL /* lsm unused by osc currently */,
1307 aa->aa_page_count, aa->aa_ppga, &new_req);
1311 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1313 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1314 if (oap->oap_request != NULL) {
1315 LASSERTF(request == oap->oap_request,
1316 "request %p != oap_request %p\n",
1317 request, oap->oap_request);
1318 if (oap->oap_interrupted) {
1319 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1320 ptlrpc_req_finished(new_req);
1325 /* New request takes over pga and oaps from old request.
1326 * Note that copying a list_head doesn't work, need to move it... */
1328 new_req->rq_interpret_reply = request->rq_interpret_reply;
1329 new_req->rq_async_args = request->rq_async_args;
1330 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1332 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1334 INIT_LIST_HEAD(&new_aa->aa_oaps);
1335 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1336 INIT_LIST_HEAD(&aa->aa_oaps);
1338 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1339 if (oap->oap_request) {
1340 ptlrpc_req_finished(oap->oap_request);
1341 oap->oap_request = ptlrpc_request_addref(new_req);
1345 /* use ptlrpc_set_add_req is safe because interpret functions work
1346 * in check_set context. only one way exist with access to request
1347 * from different thread got -EINTR - this way protected with
1348 * cl_loi_list_lock */
1349 ptlrpc_set_add_req(set, new_req);
1351 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1353 DEBUG_REQ(D_INFO, new_req, "new request");
1357 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1359 struct osc_brw_async_args *aa = data;
1363 rc = osc_brw_fini_request(request, rc);
1364 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1365 if (osc_recoverable_error(rc)) {
1366 rc = osc_brw_redo_request(request, aa);
1370 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1371 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1372 aa->aa_cli->cl_w_in_flight--;
1374 aa->aa_cli->cl_r_in_flight--;
1376 for (i = 0; i < aa->aa_page_count; i++)
1377 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1378 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1379 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1384 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1385 struct lov_stripe_md *lsm, obd_count page_count,
1386 struct brw_page **pga, struct ptlrpc_request_set *set)
1388 struct ptlrpc_request *request;
1389 struct client_obd *cli = &exp->exp_obd->u.cli;
1391 struct osc_brw_async_args *aa;
1394 /* Consume write credits even if doing a sync write -
1395 * otherwise we may run out of space on OST due to grant. */
1396 if (cmd == OBD_BRW_WRITE) {
1397 client_obd_list_lock(&cli->cl_loi_list_lock);
1398 for (i = 0; i < page_count; i++) {
1399 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1400 osc_consume_write_grant(cli, pga[i]);
1402 client_obd_list_unlock(&cli->cl_loi_list_lock);
1405 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1406 page_count, pga, &request);
1408 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1409 if (cmd == OBD_BRW_READ) {
1410 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1411 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1412 ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
1414 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1415 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1416 cli->cl_w_in_flight);
1417 ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
1421 request->rq_interpret_reply = brw_interpret;
1422 ptlrpc_set_add_req(set, request);
1423 client_obd_list_lock(&cli->cl_loi_list_lock);
1424 if (cmd == OBD_BRW_READ)
1425 cli->cl_r_in_flight++;
1427 cli->cl_w_in_flight++;
1428 client_obd_list_unlock(&cli->cl_loi_list_lock);
1429 } else if (cmd == OBD_BRW_WRITE) {
1430 client_obd_list_lock(&cli->cl_loi_list_lock);
1431 for (i = 0; i < page_count; i++)
1432 osc_release_write_grant(cli, pga[i], 0);
1433 client_obd_list_unlock(&cli->cl_loi_list_lock);
1440 * ugh, we want disk allocation on the target to happen in offset order. we'll
1441 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1442 * fine for our small page arrays and doesn't require allocation. its an
1443 * insertion sort that swaps elements that are strides apart, shrinking the
1444 * stride down until its '1' and the array is sorted.
1446 static void sort_brw_pages(struct brw_page **array, int num)
1449 struct brw_page *tmp;
1453 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1458 for (i = stride ; i < num ; i++) {
1461 while (j >= stride && array[j-stride]->off > tmp->off) {
1462 array[j] = array[j - stride];
1467 } while (stride > 1);
1470 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1476 LASSERT (pages > 0);
1477 offset = pg[i]->off & (~CFS_PAGE_MASK);
1481 if (pages == 0) /* that's all */
1484 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1485 return count; /* doesn't end on page boundary */
1488 offset = pg[i]->off & (~CFS_PAGE_MASK);
1489 if (offset != 0) /* doesn't start on page boundary */
1496 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1498 struct brw_page **ppga;
1501 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1505 for (i = 0; i < count; i++)
1510 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1512 LASSERT(ppga != NULL);
1513 OBD_FREE(ppga, sizeof(*ppga) * count);
1516 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1517 obd_count page_count, struct brw_page *pga,
1518 struct obd_trans_info *oti)
1520 struct obdo *saved_oa = NULL;
1521 struct brw_page **ppga, **orig;
1522 struct obd_import *imp = class_exp2cliimp(exp);
1523 struct client_obd *cli = &imp->imp_obd->u.cli;
1524 int rc, page_count_orig;
1527 if (cmd & OBD_BRW_CHECK) {
1528 /* The caller just wants to know if there's a chance that this
1529 * I/O can succeed */
1531 if (imp == NULL || imp->imp_invalid)
1536 /* test_brw with a failed create can trip this, maybe others. */
1537 LASSERT(cli->cl_max_pages_per_rpc);
1541 orig = ppga = osc_build_ppga(pga, page_count);
1544 page_count_orig = page_count;
1546 sort_brw_pages(ppga, page_count);
1547 while (page_count) {
1548 obd_count pages_per_brw;
1550 if (page_count > cli->cl_max_pages_per_rpc)
1551 pages_per_brw = cli->cl_max_pages_per_rpc;
1553 pages_per_brw = page_count;
1555 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1557 if (saved_oa != NULL) {
1558 /* restore previously saved oa */
1559 *oinfo->oi_oa = *saved_oa;
1560 } else if (page_count > pages_per_brw) {
1561 /* save a copy of oa (brw will clobber it) */
1562 OBDO_ALLOC(saved_oa);
1563 if (saved_oa == NULL)
1564 GOTO(out, rc = -ENOMEM);
1565 *saved_oa = *oinfo->oi_oa;
1568 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1569 pages_per_brw, ppga);
1574 page_count -= pages_per_brw;
1575 ppga += pages_per_brw;
1579 osc_release_ppga(orig, page_count_orig);
1581 if (saved_oa != NULL)
1582 OBDO_FREE(saved_oa);
1587 static int osc_brw_async(int cmd, struct obd_export *exp,
1588 struct obd_info *oinfo, obd_count page_count,
1589 struct brw_page *pga, struct obd_trans_info *oti,
1590 struct ptlrpc_request_set *set)
1592 struct brw_page **ppga, **orig;
1593 int page_count_orig;
1597 if (cmd & OBD_BRW_CHECK) {
1598 /* The caller just wants to know if there's a chance that this
1599 * I/O can succeed */
1600 struct obd_import *imp = class_exp2cliimp(exp);
1602 if (imp == NULL || imp->imp_invalid)
1607 orig = ppga = osc_build_ppga(pga, page_count);
1610 page_count_orig = page_count;
1612 sort_brw_pages(ppga, page_count);
1613 while (page_count) {
1614 struct brw_page **copy;
1615 obd_count pages_per_brw;
1617 pages_per_brw = min_t(obd_count, page_count,
1618 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1620 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1622 /* use ppga only if single RPC is going to fly */
1623 if (pages_per_brw != page_count_orig || ppga != orig) {
1624 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1626 GOTO(out, rc = -ENOMEM);
1627 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1631 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1632 pages_per_brw, copy, set);
1636 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1641 /* we passed it to async_internal() which is
1642 * now responsible for releasing memory */
1646 page_count -= pages_per_brw;
1647 ppga += pages_per_brw;
1651 osc_release_ppga(orig, page_count_orig);
1655 static void osc_check_rpcs(struct client_obd *cli);
1657 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1658 * the dirty accounting. Writeback completes or truncate happens before
1659 * writing starts. Must be called with the loi lock held. */
1660 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1663 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1666 /* This maintains the lists of pending pages to read/write for a given object
1667 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1668 * to quickly find objects that are ready to send an RPC. */
1669 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1675 if (lop->lop_num_pending == 0)
1678 /* if we have an invalid import we want to drain the queued pages
1679 * by forcing them through rpcs that immediately fail and complete
1680 * the pages. recovery relies on this to empty the queued pages
1681 * before canceling the locks and evicting down the llite pages */
1682 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1685 /* stream rpcs in queue order as long as as there is an urgent page
1686 * queued. this is our cheap solution for good batching in the case
1687 * where writepage marks some random page in the middle of the file
1688 * as urgent because of, say, memory pressure */
1689 if (!list_empty(&lop->lop_urgent)) {
1690 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1694 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1695 optimal = cli->cl_max_pages_per_rpc;
1696 if (cmd & OBD_BRW_WRITE) {
1697 /* trigger a write rpc stream as long as there are dirtiers
1698 * waiting for space. as they're waiting, they're not going to
1699 * create more pages to coallesce with what's waiting.. */
1700 if (!list_empty(&cli->cl_cache_waiters)) {
1701 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1705 /* +16 to avoid triggering rpcs that would want to include pages
1706 * that are being queued but which can't be made ready until
1707 * the queuer finishes with the page. this is a wart for
1708 * llite::commit_write() */
1711 if (lop->lop_num_pending >= optimal)
1717 static void on_list(struct list_head *item, struct list_head *list,
1720 if (list_empty(item) && should_be_on)
1721 list_add_tail(item, list);
1722 else if (!list_empty(item) && !should_be_on)
1723 list_del_init(item);
1726 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1727 * can find pages to build into rpcs quickly */
1728 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1730 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1731 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1732 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1734 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1735 loi->loi_write_lop.lop_num_pending);
1737 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1738 loi->loi_read_lop.lop_num_pending);
1741 static void lop_update_pending(struct client_obd *cli,
1742 struct loi_oap_pages *lop, int cmd, int delta)
1744 lop->lop_num_pending += delta;
1745 if (cmd & OBD_BRW_WRITE)
1746 cli->cl_pending_w_pages += delta;
1748 cli->cl_pending_r_pages += delta;
1751 /* this is called when a sync waiter receives an interruption. Its job is to
1752 * get the caller woken as soon as possible. If its page hasn't been put in an
1753 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1754 * desiring interruption which will forcefully complete the rpc once the rpc
1756 static void osc_occ_interrupted(struct oig_callback_context *occ)
1758 struct osc_async_page *oap;
1759 struct loi_oap_pages *lop;
1760 struct lov_oinfo *loi;
1763 /* XXX member_of() */
1764 oap = list_entry(occ, struct osc_async_page, oap_occ);
1766 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1768 oap->oap_interrupted = 1;
1770 /* ok, it's been put in an rpc. only one oap gets a request reference */
1771 if (oap->oap_request != NULL) {
1772 ptlrpc_mark_interrupted(oap->oap_request);
1773 ptlrpcd_wake(oap->oap_request);
1777 /* we don't get interruption callbacks until osc_trigger_group_io()
1778 * has been called and put the sync oaps in the pending/urgent lists.*/
1779 if (!list_empty(&oap->oap_pending_item)) {
1780 list_del_init(&oap->oap_pending_item);
1781 list_del_init(&oap->oap_urgent_item);
1784 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1785 &loi->loi_write_lop : &loi->loi_read_lop;
1786 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1787 loi_list_maint(oap->oap_cli, oap->oap_loi);
1789 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1790 oap->oap_oig = NULL;
1794 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1797 /* this is trying to propogate async writeback errors back up to the
1798 * application. As an async write fails we record the error code for later if
1799 * the app does an fsync. As long as errors persist we force future rpcs to be
1800 * sync so that the app can get a sync error and break the cycle of queueing
1801 * pages for which writeback will fail. */
1802 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1809 ar->ar_force_sync = 1;
1810 ar->ar_min_xid = ptlrpc_sample_next_xid();
1815 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1816 ar->ar_force_sync = 0;
1819 static void osc_oap_to_pending(struct osc_async_page *oap)
1821 struct loi_oap_pages *lop;
1823 if (oap->oap_cmd & OBD_BRW_WRITE)
1824 lop = &oap->oap_loi->loi_write_lop;
1826 lop = &oap->oap_loi->loi_read_lop;
1828 if (oap->oap_async_flags & ASYNC_URGENT)
1829 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1830 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1831 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1834 /* this must be called holding the loi list lock to give coverage to exit_cache,
1835 * async_flag maintenance, and oap_request */
1836 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1837 struct osc_async_page *oap, int sent, int rc)
1842 if (oap->oap_request != NULL) {
1843 xid = ptlrpc_req_xid(oap->oap_request);
1844 ptlrpc_req_finished(oap->oap_request);
1845 oap->oap_request = NULL;
1848 oap->oap_async_flags = 0;
1849 oap->oap_interrupted = 0;
1851 if (oap->oap_cmd & OBD_BRW_WRITE) {
1852 osc_process_ar(&cli->cl_ar, xid, rc);
1853 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1856 if (rc == 0 && oa != NULL) {
1857 if (oa->o_valid & OBD_MD_FLBLOCKS)
1858 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1859 if (oa->o_valid & OBD_MD_FLMTIME)
1860 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1861 if (oa->o_valid & OBD_MD_FLATIME)
1862 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1863 if (oa->o_valid & OBD_MD_FLCTIME)
1864 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1868 osc_exit_cache(cli, oap, sent);
1869 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1870 oap->oap_oig = NULL;
1875 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1876 oap->oap_cmd, oa, rc);
1878 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1879 * I/O on the page could start, but OSC calls it under lock
1880 * and thus we can add oap back to pending safely */
1882 /* upper layer wants to leave the page on pending queue */
1883 osc_oap_to_pending(oap);
1885 osc_exit_cache(cli, oap, sent);
1889 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1891 struct osc_brw_async_args *aa = data;
1892 struct osc_async_page *oap, *tmp;
1893 struct client_obd *cli;
1896 rc = osc_brw_fini_request(request, rc);
1897 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1899 if (osc_recoverable_error(rc)) {
1900 rc = osc_brw_redo_request(request, aa);
1906 client_obd_list_lock(&cli->cl_loi_list_lock);
1907 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1908 * is called so we know whether to go to sync BRWs or wait for more
1909 * RPCs to complete */
1910 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1911 cli->cl_w_in_flight--;
1913 cli->cl_r_in_flight--;
1915 /* the caller may re-use the oap after the completion call so
1916 * we need to clean it up a little */
1917 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1918 list_del_init(&oap->oap_rpc_item);
1919 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1922 osc_wake_cache_waiters(cli);
1923 osc_check_rpcs(cli);
1924 client_obd_list_unlock(&cli->cl_loi_list_lock);
1926 OBDO_FREE(aa->aa_oa);
1928 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1932 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1933 struct list_head *rpc_list,
1934 int page_count, int cmd)
1936 struct ptlrpc_request *req;
1937 struct brw_page **pga = NULL;
1938 struct osc_brw_async_args *aa;
1939 struct obdo *oa = NULL;
1940 struct obd_async_page_ops *ops = NULL;
1941 void *caller_data = NULL;
1942 struct osc_async_page *oap;
1946 LASSERT(!list_empty(rpc_list));
1948 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1950 RETURN(ERR_PTR(-ENOMEM));
1954 GOTO(out, req = ERR_PTR(-ENOMEM));
1957 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1959 ops = oap->oap_caller_ops;
1960 caller_data = oap->oap_caller_data;
1962 pga[i] = &oap->oap_brw_page;
1963 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1964 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1965 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1969 /* always get the data for the obdo for the rpc */
1970 LASSERT(ops != NULL);
1971 ops->ap_fill_obdo(caller_data, cmd, oa);
1973 sort_brw_pages(pga, page_count);
1974 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1976 CERROR("prep_req failed: %d\n", rc);
1977 GOTO(out, req = ERR_PTR(rc));
1980 /* Need to update the timestamps after the request is built in case
1981 * we race with setattr (locally or in queue at OST). If OST gets
1982 * later setattr before earlier BRW (as determined by the request xid),
1983 * the OST will not use BRW timestamps. Sadly, there is no obvious
1984 * way to do this in a single call. bug 10150 */
1985 ops->ap_update_obdo(caller_data, cmd, oa,
1986 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1988 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1989 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1990 INIT_LIST_HEAD(&aa->aa_oaps);
1991 list_splice(rpc_list, &aa->aa_oaps);
1992 INIT_LIST_HEAD(rpc_list);
1999 OBD_FREE(pga, sizeof(*pga) * page_count);
2004 /* the loi lock is held across this function but it's allowed to release
2005 * and reacquire it during its work */
2006 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2007 int cmd, struct loi_oap_pages *lop)
2009 struct ptlrpc_request *req;
2010 obd_count page_count = 0;
2011 struct osc_async_page *oap = NULL, *tmp;
2012 struct osc_brw_async_args *aa;
2013 struct obd_async_page_ops *ops;
2014 CFS_LIST_HEAD(rpc_list);
2015 unsigned int ending_offset;
2016 unsigned starting_offset = 0;
2020 /* first we find the pages we're allowed to work with */
2021 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2022 ops = oap->oap_caller_ops;
2024 LASSERT(oap->oap_magic == OAP_MAGIC);
2026 if (page_count != 0 &&
2027 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2028 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2029 " oap %p, page %p, srvlock %u\n",
2030 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2033 /* in llite being 'ready' equates to the page being locked
2034 * until completion unlocks it. commit_write submits a page
2035 * as not ready because its unlock will happen unconditionally
2036 * as the call returns. if we race with commit_write giving
2037 * us that page we dont' want to create a hole in the page
2038 * stream, so we stop and leave the rpc to be fired by
2039 * another dirtier or kupdated interval (the not ready page
2040 * will still be on the dirty list). we could call in
2041 * at the end of ll_file_write to process the queue again. */
2042 if (!(oap->oap_async_flags & ASYNC_READY)) {
2043 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2045 CDEBUG(D_INODE, "oap %p page %p returned %d "
2046 "instead of ready\n", oap,
2050 /* llite is telling us that the page is still
2051 * in commit_write and that we should try
2052 * and put it in an rpc again later. we
2053 * break out of the loop so we don't create
2054 * a hole in the sequence of pages in the rpc
2059 /* the io isn't needed.. tell the checks
2060 * below to complete the rpc with EINTR */
2061 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2062 oap->oap_count = -EINTR;
2065 oap->oap_async_flags |= ASYNC_READY;
2068 LASSERTF(0, "oap %p page %p returned %d "
2069 "from make_ready\n", oap,
2077 * Page submitted for IO has to be locked. Either by
2078 * ->ap_make_ready() or by higher layers.
2080 * XXX nikita: this assertion should be adjusted when lustre
2081 * starts using PG_writeback for pages being written out.
2083 #if defined(__KERNEL__) && defined(__LINUX__)
2084 LASSERT(PageLocked(oap->oap_page));
2086 /* If there is a gap at the start of this page, it can't merge
2087 * with any previous page, so we'll hand the network a
2088 * "fragmented" page array that it can't transfer in 1 RDMA */
2089 if (page_count != 0 && oap->oap_page_off != 0)
2092 /* take the page out of our book-keeping */
2093 list_del_init(&oap->oap_pending_item);
2094 lop_update_pending(cli, lop, cmd, -1);
2095 list_del_init(&oap->oap_urgent_item);
2097 if (page_count == 0)
2098 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2099 (PTLRPC_MAX_BRW_SIZE - 1);
2101 /* ask the caller for the size of the io as the rpc leaves. */
2102 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2104 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2105 if (oap->oap_count <= 0) {
2106 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2108 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2112 /* now put the page back in our accounting */
2113 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2114 if (page_count == 0)
2115 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2116 if (++page_count >= cli->cl_max_pages_per_rpc)
2119 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2120 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2121 * have the same alignment as the initial writes that allocated
2122 * extents on the server. */
2123 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2124 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2125 if (ending_offset == 0)
2128 /* If there is a gap at the end of this page, it can't merge
2129 * with any subsequent pages, so we'll hand the network a
2130 * "fragmented" page array that it can't transfer in 1 RDMA */
2131 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2135 osc_wake_cache_waiters(cli);
2137 if (page_count == 0)
2140 loi_list_maint(cli, loi);
2142 client_obd_list_unlock(&cli->cl_loi_list_lock);
2144 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2146 /* this should happen rarely and is pretty bad, it makes the
2147 * pending list not follow the dirty order */
2148 client_obd_list_lock(&cli->cl_loi_list_lock);
2149 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2150 list_del_init(&oap->oap_rpc_item);
2152 /* queued sync pages can be torn down while the pages
2153 * were between the pending list and the rpc */
2154 if (oap->oap_interrupted) {
2155 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2156 osc_ap_completion(cli, NULL, oap, 0,
2160 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2162 loi_list_maint(cli, loi);
2163 RETURN(PTR_ERR(req));
2166 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2167 if (cmd == OBD_BRW_READ) {
2168 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2169 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2170 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2171 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2172 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2174 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2175 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2176 cli->cl_w_in_flight);
2177 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2178 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2179 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2182 client_obd_list_lock(&cli->cl_loi_list_lock);
2184 if (cmd == OBD_BRW_READ)
2185 cli->cl_r_in_flight++;
2187 cli->cl_w_in_flight++;
2189 /* queued sync pages can be torn down while the pages
2190 * were between the pending list and the rpc */
2192 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2193 /* only one oap gets a request reference */
2196 if (oap->oap_interrupted && !req->rq_intr) {
2197 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2199 ptlrpc_mark_interrupted(req);
2203 tmp->oap_request = ptlrpc_request_addref(req);
2205 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2206 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2208 req->rq_interpret_reply = brw_interpret_oap;
2209 ptlrpcd_add_req(req);
2213 #define LOI_DEBUG(LOI, STR, args...) \
2214 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2215 !list_empty(&(LOI)->loi_cli_item), \
2216 (LOI)->loi_write_lop.lop_num_pending, \
2217 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2218 (LOI)->loi_read_lop.lop_num_pending, \
2219 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2222 /* This is called by osc_check_rpcs() to find which objects have pages that
2223 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2224 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2227 /* first return all objects which we already know to have
2228 * pages ready to be stuffed into rpcs */
2229 if (!list_empty(&cli->cl_loi_ready_list))
2230 RETURN(list_entry(cli->cl_loi_ready_list.next,
2231 struct lov_oinfo, loi_cli_item));
2233 /* then if we have cache waiters, return all objects with queued
2234 * writes. This is especially important when many small files
2235 * have filled up the cache and not been fired into rpcs because
2236 * they don't pass the nr_pending/object threshhold */
2237 if (!list_empty(&cli->cl_cache_waiters) &&
2238 !list_empty(&cli->cl_loi_write_list))
2239 RETURN(list_entry(cli->cl_loi_write_list.next,
2240 struct lov_oinfo, loi_write_item));
2242 /* then return all queued objects when we have an invalid import
2243 * so that they get flushed */
2244 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2245 if (!list_empty(&cli->cl_loi_write_list))
2246 RETURN(list_entry(cli->cl_loi_write_list.next,
2247 struct lov_oinfo, loi_write_item));
2248 if (!list_empty(&cli->cl_loi_read_list))
2249 RETURN(list_entry(cli->cl_loi_read_list.next,
2250 struct lov_oinfo, loi_read_item));
2255 /* called with the loi list lock held */
2256 static void osc_check_rpcs(struct client_obd *cli)
2258 struct lov_oinfo *loi;
2259 int rc = 0, race_counter = 0;
2262 while ((loi = osc_next_loi(cli)) != NULL) {
2263 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2265 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2268 /* attempt some read/write balancing by alternating between
2269 * reads and writes in an object. The makes_rpc checks here
2270 * would be redundant if we were getting read/write work items
2271 * instead of objects. we don't want send_oap_rpc to drain a
2272 * partial read pending queue when we're given this object to
2273 * do io on writes while there are cache waiters */
2274 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2275 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2276 &loi->loi_write_lop);
2284 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2285 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2286 &loi->loi_read_lop);
2295 /* attempt some inter-object balancing by issueing rpcs
2296 * for each object in turn */
2297 if (!list_empty(&loi->loi_cli_item))
2298 list_del_init(&loi->loi_cli_item);
2299 if (!list_empty(&loi->loi_write_item))
2300 list_del_init(&loi->loi_write_item);
2301 if (!list_empty(&loi->loi_read_item))
2302 list_del_init(&loi->loi_read_item);
2304 loi_list_maint(cli, loi);
2306 /* send_oap_rpc fails with 0 when make_ready tells it to
2307 * back off. llite's make_ready does this when it tries
2308 * to lock a page queued for write that is already locked.
2309 * we want to try sending rpcs from many objects, but we
2310 * don't want to spin failing with 0. */
2311 if (race_counter == 10)
2317 /* we're trying to queue a page in the osc so we're subject to the
2318 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2319 * If the osc's queued pages are already at that limit, then we want to sleep
2320 * until there is space in the osc's queue for us. We also may be waiting for
2321 * write credits from the OST if there are RPCs in flight that may return some
2322 * before we fall back to sync writes.
2324 * We need this know our allocation was granted in the presence of signals */
2325 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2329 client_obd_list_lock(&cli->cl_loi_list_lock);
2330 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2331 client_obd_list_unlock(&cli->cl_loi_list_lock);
2335 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2336 * grant or cache space. */
2337 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2338 struct osc_async_page *oap)
2340 struct osc_cache_waiter ocw;
2341 struct l_wait_info lwi = { 0 };
2344 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2345 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2346 cli->cl_dirty_max, obd_max_dirty_pages,
2347 cli->cl_lost_grant, cli->cl_avail_grant);
2349 /* force the caller to try sync io. this can jump the list
2350 * of queued writes and create a discontiguous rpc stream */
2351 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2352 loi->loi_ar.ar_force_sync)
2355 /* Hopefully normal case - cache space and write credits available */
2356 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2357 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2358 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2359 /* account for ourselves */
2360 osc_consume_write_grant(cli, &oap->oap_brw_page);
2364 /* Make sure that there are write rpcs in flight to wait for. This
2365 * is a little silly as this object may not have any pending but
2366 * other objects sure might. */
2367 if (cli->cl_w_in_flight) {
2368 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2369 cfs_waitq_init(&ocw.ocw_waitq);
2373 loi_list_maint(cli, loi);
2374 osc_check_rpcs(cli);
2375 client_obd_list_unlock(&cli->cl_loi_list_lock);
2377 CDEBUG(D_CACHE, "sleeping for cache space\n");
2378 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2380 client_obd_list_lock(&cli->cl_loi_list_lock);
2381 if (!list_empty(&ocw.ocw_entry)) {
2382 list_del(&ocw.ocw_entry);
2391 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2392 struct lov_oinfo *loi, cfs_page_t *page,
2393 obd_off offset, struct obd_async_page_ops *ops,
2394 void *data, void **res)
2396 struct osc_async_page *oap;
2400 return size_round(sizeof(*oap));
2403 oap->oap_magic = OAP_MAGIC;
2404 oap->oap_cli = &exp->exp_obd->u.cli;
2407 oap->oap_caller_ops = ops;
2408 oap->oap_caller_data = data;
2410 oap->oap_page = page;
2411 oap->oap_obj_off = offset;
2413 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2414 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2415 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2417 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2419 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2423 struct osc_async_page *oap_from_cookie(void *cookie)
2425 struct osc_async_page *oap = cookie;
2426 if (oap->oap_magic != OAP_MAGIC)
2427 return ERR_PTR(-EINVAL);
2431 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2432 struct lov_oinfo *loi, void *cookie,
2433 int cmd, obd_off off, int count,
2434 obd_flag brw_flags, enum async_flags async_flags)
2436 struct client_obd *cli = &exp->exp_obd->u.cli;
2437 struct osc_async_page *oap;
2441 oap = oap_from_cookie(cookie);
2443 RETURN(PTR_ERR(oap));
2445 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2448 if (!list_empty(&oap->oap_pending_item) ||
2449 !list_empty(&oap->oap_urgent_item) ||
2450 !list_empty(&oap->oap_rpc_item))
2453 /* check if the file's owner/group is over quota */
2454 #ifdef HAVE_QUOTA_SUPPORT
2455 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2456 struct obd_async_page_ops *ops;
2463 ops = oap->oap_caller_ops;
2464 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2465 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2476 loi = lsm->lsm_oinfo[0];
2478 client_obd_list_lock(&cli->cl_loi_list_lock);
2481 oap->oap_page_off = off;
2482 oap->oap_count = count;
2483 oap->oap_brw_flags = brw_flags;
2484 oap->oap_async_flags = async_flags;
2486 if (cmd & OBD_BRW_WRITE) {
2487 rc = osc_enter_cache(cli, loi, oap);
2489 client_obd_list_unlock(&cli->cl_loi_list_lock);
2494 osc_oap_to_pending(oap);
2495 loi_list_maint(cli, loi);
2497 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2500 osc_check_rpcs(cli);
2501 client_obd_list_unlock(&cli->cl_loi_list_lock);
2506 /* aka (~was & now & flag), but this is more clear :) */
2507 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2509 static int osc_set_async_flags(struct obd_export *exp,
2510 struct lov_stripe_md *lsm,
2511 struct lov_oinfo *loi, void *cookie,
2512 obd_flag async_flags)
2514 struct client_obd *cli = &exp->exp_obd->u.cli;
2515 struct loi_oap_pages *lop;
2516 struct osc_async_page *oap;
2520 oap = oap_from_cookie(cookie);
2522 RETURN(PTR_ERR(oap));
2525 * bug 7311: OST-side locking is only supported for liblustre for now
2526 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2527 * implementation has to handle case where OST-locked page was picked
2528 * up by, e.g., ->writepage().
2530 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2531 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2534 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2538 loi = lsm->lsm_oinfo[0];
2540 if (oap->oap_cmd & OBD_BRW_WRITE) {
2541 lop = &loi->loi_write_lop;
2543 lop = &loi->loi_read_lop;
2546 client_obd_list_lock(&cli->cl_loi_list_lock);
2548 if (list_empty(&oap->oap_pending_item))
2549 GOTO(out, rc = -EINVAL);
2551 if ((oap->oap_async_flags & async_flags) == async_flags)
2554 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2555 oap->oap_async_flags |= ASYNC_READY;
2557 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2558 if (list_empty(&oap->oap_rpc_item)) {
2559 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2560 loi_list_maint(cli, loi);
2564 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2565 oap->oap_async_flags);
2567 osc_check_rpcs(cli);
2568 client_obd_list_unlock(&cli->cl_loi_list_lock);
2572 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2573 struct lov_oinfo *loi,
2574 struct obd_io_group *oig, void *cookie,
2575 int cmd, obd_off off, int count,
2577 obd_flag async_flags)
2579 struct client_obd *cli = &exp->exp_obd->u.cli;
2580 struct osc_async_page *oap;
2581 struct loi_oap_pages *lop;
2585 oap = oap_from_cookie(cookie);
2587 RETURN(PTR_ERR(oap));
2589 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2592 if (!list_empty(&oap->oap_pending_item) ||
2593 !list_empty(&oap->oap_urgent_item) ||
2594 !list_empty(&oap->oap_rpc_item))
2598 loi = lsm->lsm_oinfo[0];
2600 client_obd_list_lock(&cli->cl_loi_list_lock);
2603 oap->oap_page_off = off;
2604 oap->oap_count = count;
2605 oap->oap_brw_flags = brw_flags;
2606 oap->oap_async_flags = async_flags;
2608 if (cmd & OBD_BRW_WRITE)
2609 lop = &loi->loi_write_lop;
2611 lop = &loi->loi_read_lop;
2613 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2614 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2616 rc = oig_add_one(oig, &oap->oap_occ);
2619 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2620 oap, oap->oap_page, rc);
2622 client_obd_list_unlock(&cli->cl_loi_list_lock);
2627 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2628 struct loi_oap_pages *lop, int cmd)
2630 struct list_head *pos, *tmp;
2631 struct osc_async_page *oap;
2633 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2634 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2635 list_del(&oap->oap_pending_item);
2636 osc_oap_to_pending(oap);
2638 loi_list_maint(cli, loi);
2641 static int osc_trigger_group_io(struct obd_export *exp,
2642 struct lov_stripe_md *lsm,
2643 struct lov_oinfo *loi,
2644 struct obd_io_group *oig)
2646 struct client_obd *cli = &exp->exp_obd->u.cli;
2650 loi = lsm->lsm_oinfo[0];
2652 client_obd_list_lock(&cli->cl_loi_list_lock);
2654 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2655 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2657 osc_check_rpcs(cli);
2658 client_obd_list_unlock(&cli->cl_loi_list_lock);
2663 static int osc_teardown_async_page(struct obd_export *exp,
2664 struct lov_stripe_md *lsm,
2665 struct lov_oinfo *loi, void *cookie)
2667 struct client_obd *cli = &exp->exp_obd->u.cli;
2668 struct loi_oap_pages *lop;
2669 struct osc_async_page *oap;
2673 oap = oap_from_cookie(cookie);
2675 RETURN(PTR_ERR(oap));
2678 loi = lsm->lsm_oinfo[0];
2680 if (oap->oap_cmd & OBD_BRW_WRITE) {
2681 lop = &loi->loi_write_lop;
2683 lop = &loi->loi_read_lop;
2686 client_obd_list_lock(&cli->cl_loi_list_lock);
2688 if (!list_empty(&oap->oap_rpc_item))
2689 GOTO(out, rc = -EBUSY);
2691 osc_exit_cache(cli, oap, 0);
2692 osc_wake_cache_waiters(cli);
2694 if (!list_empty(&oap->oap_urgent_item)) {
2695 list_del_init(&oap->oap_urgent_item);
2696 oap->oap_async_flags &= ~ASYNC_URGENT;
2698 if (!list_empty(&oap->oap_pending_item)) {
2699 list_del_init(&oap->oap_pending_item);
2700 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2702 loi_list_maint(cli, loi);
2704 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2706 client_obd_list_unlock(&cli->cl_loi_list_lock);
2710 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2713 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2716 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2719 lock_res_and_lock(lock);
2720 #if defined (__KERNEL__) && defined (__LINUX__)
2721 /* Liang XXX: Darwin and Winnt checking should be added */
2722 if (lock->l_ast_data && lock->l_ast_data != data) {
2723 struct inode *new_inode = data;
2724 struct inode *old_inode = lock->l_ast_data;
2725 if (!(old_inode->i_state & I_FREEING))
2726 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2727 LASSERTF(old_inode->i_state & I_FREEING,
2728 "Found existing inode %p/%lu/%u state %lu in lock: "
2729 "setting data to %p/%lu/%u\n", old_inode,
2730 old_inode->i_ino, old_inode->i_generation,
2732 new_inode, new_inode->i_ino, new_inode->i_generation);
2735 lock->l_ast_data = data;
2736 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2737 unlock_res_and_lock(lock);
2738 LDLM_LOCK_PUT(lock);
2741 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2742 ldlm_iterator_t replace, void *data)
2744 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2745 struct obd_device *obd = class_exp2obd(exp);
2747 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2751 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2757 /* The request was created before ldlm_cli_enqueue call. */
2758 if (rc == ELDLM_LOCK_ABORTED) {
2759 struct ldlm_reply *rep;
2761 /* swabbed by ldlm_cli_enqueue() */
2762 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2763 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2765 LASSERT(rep != NULL);
2766 if (rep->lock_policy_res1)
2767 rc = rep->lock_policy_res1;
2771 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2772 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2773 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2774 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2775 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2778 /* Call the update callback. */
2779 rc = oinfo->oi_cb_up(oinfo, rc);
2783 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2784 struct osc_enqueue_args *aa, int rc)
2786 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2787 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2788 struct ldlm_lock *lock;
2790 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2792 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2794 /* Complete obtaining the lock procedure. */
2795 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2797 &aa->oa_oi->oi_flags,
2798 &lsm->lsm_oinfo[0]->loi_lvb,
2799 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2800 lustre_swab_ost_lvb,
2801 aa->oa_oi->oi_lockh, rc);
2803 /* Complete osc stuff. */
2804 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2806 /* Release the lock for async request. */
2807 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2808 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2810 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2811 aa->oa_oi->oi_lockh, req, aa);
2812 LDLM_LOCK_PUT(lock);
2816 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2817 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2818 * other synchronous requests, however keeping some locks and trying to obtain
2819 * others may take a considerable amount of time in a case of ost failure; and
2820 * when other sync requests do not get released lock from a client, the client
2821 * is excluded from the cluster -- such scenarious make the life difficult, so
2822 * release locks just after they are obtained. */
2823 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2824 struct ldlm_enqueue_info *einfo,
2825 struct ptlrpc_request_set *rqset)
2827 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2828 struct obd_device *obd = exp->exp_obd;
2829 struct ldlm_reply *rep;
2830 struct ptlrpc_request *req = NULL;
2831 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2836 /* Filesystem lock extents are extended to page boundaries so that
2837 * dealing with the page cache is a little smoother. */
2838 oinfo->oi_policy.l_extent.start -=
2839 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2840 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2842 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2845 /* Next, search for already existing extent locks that will cover us */
2846 /* If we're trying to read, we also search for an existing PW lock. The
2847 * VFS and page cache already protect us locally, so lots of readers/
2848 * writers can share a single PW lock.
2850 * There are problems with conversion deadlocks, so instead of
2851 * converting a read lock to a write lock, we'll just enqueue a new
2854 * At some point we should cancel the read lock instead of making them
2855 * send us a blocking callback, but there are problems with canceling
2856 * locks out from other users right now, too. */
2857 mode = einfo->ei_mode;
2858 if (einfo->ei_mode == LCK_PR)
2860 mode = ldlm_lock_match(obd->obd_namespace,
2861 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2862 einfo->ei_type, &oinfo->oi_policy, mode,
2865 /* addref the lock only if not async requests and PW lock is
2866 * matched whereas we asked for PR. */
2867 if (!rqset && einfo->ei_mode != mode)
2868 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2869 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2872 /* I would like to be able to ASSERT here that rss <=
2873 * kms, but I can't, for reasons which are explained in
2877 /* We already have a lock, and it's referenced */
2878 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2880 /* For async requests, decref the lock. */
2881 if (einfo->ei_mode != mode)
2882 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2884 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2892 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2893 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2894 [DLM_LOCKREQ_OFF + 1] = 0 };
2896 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2900 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2901 size[DLM_REPLY_REC_OFF] =
2902 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2903 ptlrpc_req_set_repsize(req, 3, size);
2906 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2907 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2909 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
2910 &oinfo->oi_policy, &oinfo->oi_flags,
2911 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2912 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2913 lustre_swab_ost_lvb, oinfo->oi_lockh,
2917 struct osc_enqueue_args *aa;
2918 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2919 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2924 req->rq_interpret_reply = osc_enqueue_interpret;
2925 ptlrpc_set_add_req(rqset, req);
2926 } else if (intent) {
2927 ptlrpc_req_finished(req);
2932 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2934 ptlrpc_req_finished(req);
2939 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2940 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2941 int *flags, void *data, struct lustre_handle *lockh)
2943 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2944 struct obd_device *obd = exp->exp_obd;
2945 int lflags = *flags;
2949 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2951 /* Filesystem lock extents are extended to page boundaries so that
2952 * dealing with the page cache is a little smoother */
2953 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2954 policy->l_extent.end |= ~CFS_PAGE_MASK;
2956 /* Next, search for already existing extent locks that will cover us */
2957 /* If we're trying to read, we also search for an existing PW lock. The
2958 * VFS and page cache already protect us locally, so lots of readers/
2959 * writers can share a single PW lock. */
2963 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2964 &res_id, type, policy, rc, lockh);
2966 osc_set_data_with_check(lockh, data, lflags);
2967 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2968 ldlm_lock_addref(lockh, LCK_PR);
2969 ldlm_lock_decref(lockh, LCK_PW);
2977 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2978 __u32 mode, struct lustre_handle *lockh)
2982 if (unlikely(mode == LCK_GROUP))
2983 ldlm_lock_decref_and_cancel(lockh, mode);
2985 ldlm_lock_decref(lockh, mode);
2990 static int osc_cancel_unused(struct obd_export *exp,
2991 struct lov_stripe_md *lsm, int flags, void *opaque)
2993 struct obd_device *obd = class_exp2obd(exp);
2994 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2996 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
3000 static int osc_join_lru(struct obd_export *exp,
3001 struct lov_stripe_md *lsm, int join)
3003 struct obd_device *obd = class_exp2obd(exp);
3004 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3006 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
3009 static int osc_statfs_interpret(struct ptlrpc_request *req,
3010 struct osc_async_args *aa, int rc)
3012 struct obd_statfs *msfs;
3018 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3019 lustre_swab_obd_statfs);
3021 CERROR("Can't unpack obd_statfs\n");
3022 GOTO(out, rc = -EPROTO);
3025 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3027 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3031 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3032 __u64 max_age, struct ptlrpc_request_set *rqset)
3034 struct ptlrpc_request *req;
3035 struct osc_async_args *aa;
3036 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3039 /* We could possibly pass max_age in the request (as an absolute
3040 * timestamp or a "seconds.usec ago") so the target can avoid doing
3041 * extra calls into the filesystem if that isn't necessary (e.g.
3042 * during mount that would help a bit). Having relative timestamps
3043 * is not so great if request processing is slow, while absolute
3044 * timestamps are not ideal because they need time synchronization. */
3045 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3046 OST_STATFS, 1, NULL, NULL);
3050 ptlrpc_req_set_repsize(req, 2, size);
3051 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3053 req->rq_interpret_reply = osc_statfs_interpret;
3054 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3055 aa = (struct osc_async_args *)&req->rq_async_args;
3058 ptlrpc_set_add_req(rqset, req);
3062 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3065 struct obd_statfs *msfs;
3066 struct ptlrpc_request *req;
3067 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3070 /* We could possibly pass max_age in the request (as an absolute
3071 * timestamp or a "seconds.usec ago") so the target can avoid doing
3072 * extra calls into the filesystem if that isn't necessary (e.g.
3073 * during mount that would help a bit). Having relative timestamps
3074 * is not so great if request processing is slow, while absolute
3075 * timestamps are not ideal because they need time synchronization. */
3076 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3077 OST_STATFS, 1, NULL, NULL);
3081 ptlrpc_req_set_repsize(req, 2, size);
3082 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3084 rc = ptlrpc_queue_wait(req);
3088 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3089 lustre_swab_obd_statfs);
3091 CERROR("Can't unpack obd_statfs\n");
3092 GOTO(out, rc = -EPROTO);
3095 memcpy(osfs, msfs, sizeof(*osfs));
3099 ptlrpc_req_finished(req);
3103 /* Retrieve object striping information.
3105 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3106 * the maximum number of OST indices which will fit in the user buffer.
3107 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3109 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3111 struct lov_user_md lum, *lumk;
3112 int rc = 0, lum_size;
3118 if (copy_from_user(&lum, lump, sizeof(lum)))
3121 if (lum.lmm_magic != LOV_USER_MAGIC)
3124 if (lum.lmm_stripe_count > 0) {
3125 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3126 OBD_ALLOC(lumk, lum_size);
3130 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3132 lum_size = sizeof(lum);
3136 lumk->lmm_object_id = lsm->lsm_object_id;
3137 lumk->lmm_stripe_count = 1;
3139 if (copy_to_user(lump, lumk, lum_size))
3143 OBD_FREE(lumk, lum_size);
3149 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3150 void *karg, void *uarg)
3152 struct obd_device *obd = exp->exp_obd;
3153 struct obd_ioctl_data *data = karg;
3157 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3160 if (!try_module_get(THIS_MODULE)) {
3161 CERROR("Can't get module. Is it alive?");
3166 case OBD_IOC_LOV_GET_CONFIG: {
3168 struct lov_desc *desc;
3169 struct obd_uuid uuid;
3173 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3174 GOTO(out, err = -EINVAL);
3176 data = (struct obd_ioctl_data *)buf;
3178 if (sizeof(*desc) > data->ioc_inllen1) {
3179 obd_ioctl_freedata(buf, len);
3180 GOTO(out, err = -EINVAL);
3183 if (data->ioc_inllen2 < sizeof(uuid)) {
3184 obd_ioctl_freedata(buf, len);
3185 GOTO(out, err = -EINVAL);
3188 desc = (struct lov_desc *)data->ioc_inlbuf1;
3189 desc->ld_tgt_count = 1;
3190 desc->ld_active_tgt_count = 1;
3191 desc->ld_default_stripe_count = 1;
3192 desc->ld_default_stripe_size = 0;
3193 desc->ld_default_stripe_offset = 0;
3194 desc->ld_pattern = 0;
3195 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3197 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3199 err = copy_to_user((void *)uarg, buf, len);
3202 obd_ioctl_freedata(buf, len);
3205 case LL_IOC_LOV_SETSTRIPE:
3206 err = obd_alloc_memmd(exp, karg);
3210 case LL_IOC_LOV_GETSTRIPE:
3211 err = osc_getstripe(karg, uarg);
3213 case OBD_IOC_CLIENT_RECOVER:
3214 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3219 case IOC_OSC_SET_ACTIVE:
3220 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3223 case OBD_IOC_POLL_QUOTACHECK:
3224 err = lquota_poll_check(quota_interface, exp,
3225 (struct if_quotacheck *)karg);
3227 case OBD_IOC_DESTROY: {
3230 if (!capable (CAP_SYS_ADMIN))
3231 GOTO (out, err = -EPERM);
3232 oa = &data->ioc_obdo1;
3233 oa->o_valid |= OBD_MD_FLGROUP;
3235 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3239 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3240 cmd, cfs_curproc_comm());
3241 GOTO(out, err = -ENOTTY);
3244 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3247 module_put(THIS_MODULE);
3252 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3253 void *key, __u32 *vallen, void *val)
3256 if (!vallen || !val)
3259 if (KEY_IS("lock_to_stripe")) {
3260 __u32 *stripe = val;
3261 *vallen = sizeof(*stripe);
3264 } else if (KEY_IS("last_id")) {
3265 struct ptlrpc_request *req;
3267 char *bufs[2] = { NULL, key };
3268 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3270 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3271 OST_GET_INFO, 2, size, bufs);
3275 size[REPLY_REC_OFF] = *vallen;
3276 ptlrpc_req_set_repsize(req, 2, size);
3277 rc = ptlrpc_queue_wait(req);
3281 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3282 lustre_swab_ost_last_id);
3283 if (reply == NULL) {
3284 CERROR("Can't unpack OST last ID\n");
3285 GOTO(out, rc = -EPROTO);
3287 *((obd_id *)val) = *reply;
3289 ptlrpc_req_finished(req);
3295 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3298 struct llog_ctxt *ctxt;
3299 struct obd_import *imp = req->rq_import;
3305 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3308 rc = llog_initiator_connect(ctxt);
3310 CERROR("cannot establish connection for "
3311 "ctxt %p: %d\n", ctxt, rc);
3314 llog_ctxt_put(ctxt);
3315 spin_lock(&imp->imp_lock);
3316 imp->imp_server_timeout = 1;
3317 imp->imp_pingable = 1;
3318 spin_unlock(&imp->imp_lock);
3319 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3324 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3325 void *key, obd_count vallen, void *val,
3326 struct ptlrpc_request_set *set)
3328 struct ptlrpc_request *req;
3329 struct obd_device *obd = exp->exp_obd;
3330 struct obd_import *imp = class_exp2cliimp(exp);
3331 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3332 char *bufs[3] = { NULL, key, val };
3335 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3337 if (KEY_IS(KEY_NEXT_ID)) {
3338 if (vallen != sizeof(obd_id))
3340 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3341 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3342 exp->exp_obd->obd_name,
3343 obd->u.cli.cl_oscc.oscc_next_id);
3348 if (KEY_IS("unlinked")) {
3349 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3350 spin_lock(&oscc->oscc_lock);
3351 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3352 spin_unlock(&oscc->oscc_lock);
3356 if (KEY_IS(KEY_INIT_RECOV)) {
3357 if (vallen != sizeof(int))
3359 spin_lock(&imp->imp_lock);
3360 imp->imp_initial_recov = *(int *)val;
3361 spin_unlock(&imp->imp_lock);
3362 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3363 exp->exp_obd->obd_name,
3364 imp->imp_initial_recov);
3368 if (KEY_IS("checksum")) {
3369 if (vallen != sizeof(int))
3371 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3378 /* We pass all other commands directly to OST. Since nobody calls osc
3379 methods directly and everybody is supposed to go through LOV, we
3380 assume lov checked invalid values for us.
3381 The only recognised values so far are evict_by_nid and mds_conn.
3382 Even if something bad goes through, we'd get a -EINVAL from OST
3385 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3390 if (KEY_IS(KEY_MDS_CONN))
3391 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3393 ptlrpc_req_set_repsize(req, 1, NULL);
3394 ptlrpc_set_add_req(set, req);
3395 ptlrpc_check_set(set);
3401 static struct llog_operations osc_size_repl_logops = {
3402 lop_cancel: llog_obd_repl_cancel
3405 static struct llog_operations osc_mds_ost_orig_logops;
3406 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3407 int count, struct llog_catid *catid,
3408 struct obd_uuid *uuid)
3413 spin_lock(&obd->obd_dev_lock);
3414 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3415 osc_mds_ost_orig_logops = llog_lvfs_ops;
3416 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3417 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3418 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3419 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3421 spin_unlock(&obd->obd_dev_lock);
3423 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3424 &catid->lci_logid, &osc_mds_ost_orig_logops);
3426 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3430 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3431 &osc_size_repl_logops);
3433 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3436 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3437 obd->obd_name, tgt->obd_name, count, catid, rc);
3438 CERROR("logid "LPX64":0x%x\n",
3439 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3444 static int osc_llog_finish(struct obd_device *obd, int count)
3446 struct llog_ctxt *ctxt;
3447 int rc = 0, rc2 = 0;
3450 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3452 rc = llog_cleanup(ctxt);
3454 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3456 rc2 = llog_cleanup(ctxt);
3463 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3464 struct obd_uuid *cluuid,
3465 struct obd_connect_data *data)
3467 struct client_obd *cli = &obd->u.cli;
3469 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3472 client_obd_list_lock(&cli->cl_loi_list_lock);
3473 data->ocd_grant = cli->cl_avail_grant ?:
3474 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3475 lost_grant = cli->cl_lost_grant;
3476 cli->cl_lost_grant = 0;
3477 client_obd_list_unlock(&cli->cl_loi_list_lock);
3479 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3480 "cl_lost_grant: %ld\n", data->ocd_grant,
3481 cli->cl_avail_grant, lost_grant);
3482 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3483 " ocd_grant: %d\n", data->ocd_connect_flags,
3484 data->ocd_version, data->ocd_grant);
3490 static int osc_disconnect(struct obd_export *exp)
3492 struct obd_device *obd = class_exp2obd(exp);
3493 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3496 if (obd->u.cli.cl_conn_count == 1)
3497 /* flush any remaining cancel messages out to the target */
3498 llog_sync(ctxt, exp);
3500 llog_ctxt_put(ctxt);
3502 rc = client_disconnect_export(exp);
3506 static int osc_import_event(struct obd_device *obd,
3507 struct obd_import *imp,
3508 enum obd_import_event event)
3510 struct client_obd *cli;
3514 LASSERT(imp->imp_obd == obd);
3517 case IMP_EVENT_DISCON: {
3518 /* Only do this on the MDS OSC's */
3519 if (imp->imp_server_timeout) {
3520 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3522 spin_lock(&oscc->oscc_lock);
3523 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3524 spin_unlock(&oscc->oscc_lock);
3527 client_obd_list_lock(&cli->cl_loi_list_lock);
3528 cli->cl_avail_grant = 0;
3529 cli->cl_lost_grant = 0;
3530 client_obd_list_unlock(&cli->cl_loi_list_lock);
3531 ptlrpc_import_setasync(imp, -1);
3535 case IMP_EVENT_INACTIVE: {
3536 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3539 case IMP_EVENT_INVALIDATE: {
3540 struct ldlm_namespace *ns = obd->obd_namespace;
3544 client_obd_list_lock(&cli->cl_loi_list_lock);
3545 /* all pages go to failing rpcs due to the invalid import */
3546 osc_check_rpcs(cli);
3547 client_obd_list_unlock(&cli->cl_loi_list_lock);
3549 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3553 case IMP_EVENT_ACTIVE: {
3554 /* Only do this on the MDS OSC's */
3555 if (imp->imp_server_timeout) {
3556 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3558 spin_lock(&oscc->oscc_lock);
3559 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3560 spin_unlock(&oscc->oscc_lock);
3562 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3565 case IMP_EVENT_OCD: {
3566 struct obd_connect_data *ocd = &imp->imp_connect_data;
3568 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3569 osc_init_grant(&obd->u.cli, ocd);
3572 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3573 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3575 ptlrpc_import_setasync(imp, 1);
3576 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3580 CERROR("Unknown import event %d\n", event);
3586 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3592 rc = ptlrpcd_addref();
3596 rc = client_obd_setup(obd, len, buf);
3600 struct lprocfs_static_vars lvars;
3601 struct client_obd *cli = &obd->u.cli;
3603 lprocfs_init_vars(osc, &lvars);
3604 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3605 lproc_osc_attach_seqstat(obd);
3606 ptlrpc_lprocfs_register_obd(obd);
3610 /* We need to allocate a few requests more, because
3611 brw_interpret_oap tries to create new requests before freeing
3612 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3613 reserved, but I afraid that might be too much wasted RAM
3614 in fact, so 2 is just my guess and still should work. */
3615 cli->cl_import->imp_rq_pool =
3616 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3618 ptlrpc_add_rqs_to_pool);
3624 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3630 case OBD_CLEANUP_EARLY: {
3631 struct obd_import *imp;
3632 imp = obd->u.cli.cl_import;
3633 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3634 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3635 ptlrpc_deactivate_import(imp);
3638 case OBD_CLEANUP_EXPORTS: {
3639 /* If we set up but never connected, the
3640 client import will not have been cleaned. */
3641 if (obd->u.cli.cl_import) {
3642 struct obd_import *imp;
3643 imp = obd->u.cli.cl_import;
3644 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3646 ptlrpc_invalidate_import(imp);
3647 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3648 class_destroy_import(imp);
3649 obd->u.cli.cl_import = NULL;
3653 case OBD_CLEANUP_SELF_EXP:
3654 rc = obd_llog_finish(obd, 0);
3656 CERROR("failed to cleanup llogging subsystems\n");
3658 case OBD_CLEANUP_OBD:
3664 int osc_cleanup(struct obd_device *obd)
3666 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3670 ptlrpc_lprocfs_unregister_obd(obd);
3671 lprocfs_obd_cleanup(obd);
3673 spin_lock(&oscc->oscc_lock);
3674 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3675 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3676 spin_unlock(&oscc->oscc_lock);
3678 /* free memory of osc quota cache */
3679 lquota_cleanup(quota_interface, obd);
3681 rc = client_obd_cleanup(obd);
3687 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3689 struct lustre_cfg *lcfg = buf;
3690 struct lprocfs_static_vars lvars;
3693 lprocfs_init_vars(osc, &lvars);
3695 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3699 struct obd_ops osc_obd_ops = {
3700 .o_owner = THIS_MODULE,
3701 .o_setup = osc_setup,
3702 .o_precleanup = osc_precleanup,
3703 .o_cleanup = osc_cleanup,
3704 .o_add_conn = client_import_add_conn,
3705 .o_del_conn = client_import_del_conn,
3706 .o_connect = client_connect_import,
3707 .o_reconnect = osc_reconnect,
3708 .o_disconnect = osc_disconnect,
3709 .o_statfs = osc_statfs,
3710 .o_statfs_async = osc_statfs_async,
3711 .o_packmd = osc_packmd,
3712 .o_unpackmd = osc_unpackmd,
3713 .o_precreate = osc_precreate,
3714 .o_create = osc_create,
3715 .o_destroy = osc_destroy,
3716 .o_getattr = osc_getattr,
3717 .o_getattr_async = osc_getattr_async,
3718 .o_setattr = osc_setattr,
3719 .o_setattr_async = osc_setattr_async,
3721 .o_brw_async = osc_brw_async,
3722 .o_prep_async_page = osc_prep_async_page,
3723 .o_queue_async_io = osc_queue_async_io,
3724 .o_set_async_flags = osc_set_async_flags,
3725 .o_queue_group_io = osc_queue_group_io,
3726 .o_trigger_group_io = osc_trigger_group_io,
3727 .o_teardown_async_page = osc_teardown_async_page,
3728 .o_punch = osc_punch,
3730 .o_enqueue = osc_enqueue,
3731 .o_match = osc_match,
3732 .o_change_cbdata = osc_change_cbdata,
3733 .o_cancel = osc_cancel,
3734 .o_cancel_unused = osc_cancel_unused,
3735 .o_join_lru = osc_join_lru,
3736 .o_iocontrol = osc_iocontrol,
3737 .o_get_info = osc_get_info,
3738 .o_set_info_async = osc_set_info_async,
3739 .o_import_event = osc_import_event,
3740 .o_llog_init = osc_llog_init,
3741 .o_llog_finish = osc_llog_finish,
3742 .o_process_config = osc_process_config,
3744 int __init osc_init(void)
3746 struct lprocfs_static_vars lvars;
3750 lprocfs_init_vars(osc, &lvars);
3752 request_module("lquota");
3753 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3754 lquota_init(quota_interface);
3755 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3757 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3760 if (quota_interface)
3761 PORTAL_SYMBOL_PUT(osc_quota_interface);
3769 static void /*__exit*/ osc_exit(void)
3771 lquota_exit(quota_interface);
3772 if (quota_interface)
3773 PORTAL_SYMBOL_PUT(osc_quota_interface);
3775 class_unregister_type(LUSTRE_OSC_NAME);
3778 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3779 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3780 MODULE_LICENSE("GPL");
3782 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);