1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include <lustre_cache.h>
60 #include "osc_internal.h"
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 int osc_cleanup(struct obd_device *obd);
68 static quota_interface_t *quota_interface;
69 extern quota_interface_t osc_quota_interface;
72 atomic_t osc_resend_time;
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76 struct lov_stripe_md *lsm)
81 lmm_size = sizeof(**lmmp);
86 OBD_FREE(*lmmp, lmm_size);
92 OBD_ALLOC(*lmmp, lmm_size);
98 LASSERT(lsm->lsm_object_id);
99 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
105 /* Unpack OSC object metadata from disk storage (LE byte order). */
106 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
107 struct lov_mds_md *lmm, int lmm_bytes)
113 if (lmm_bytes < sizeof (*lmm)) {
114 CERROR("lov_mds_md too small: %d, need %d\n",
115 lmm_bytes, (int)sizeof(*lmm));
118 /* XXX LOV_MAGIC etc check? */
120 if (lmm->lmm_object_id == 0) {
121 CERROR("lov_mds_md: zero lmm_object_id\n");
126 lsm_size = lov_stripe_md_size(1);
130 if (*lsmp != NULL && lmm == NULL) {
131 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
132 OBD_FREE(*lsmp, lsm_size);
138 OBD_ALLOC(*lsmp, lsm_size);
141 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
142 if ((*lsmp)->lsm_oinfo[0] == NULL) {
143 OBD_FREE(*lsmp, lsm_size);
146 loi_init((*lsmp)->lsm_oinfo[0]);
150 /* XXX zero *lsmp? */
151 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
152 LASSERT((*lsmp)->lsm_object_id);
155 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160 static int osc_getattr_interpret(struct ptlrpc_request *req,
161 struct osc_async_args *aa, int rc)
163 struct ost_body *body;
169 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
170 lustre_swab_ost_body);
172 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
173 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
175 /* This should really be sent by the OST */
176 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
177 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
179 CERROR("can't unpack ost_body\n");
181 aa->aa_oi->oi_oa->o_valid = 0;
184 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
188 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
189 struct ptlrpc_request_set *set)
191 struct ptlrpc_request *req;
192 struct ost_body *body;
193 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
194 struct osc_async_args *aa;
197 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
198 OST_GETATTR, 2, size,NULL);
202 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
203 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
205 ptlrpc_req_set_repsize(req, 2, size);
206 req->rq_interpret_reply = osc_getattr_interpret;
208 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
209 aa = (struct osc_async_args *)&req->rq_async_args;
212 ptlrpc_set_add_req(set, req);
216 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
218 struct ptlrpc_request *req;
219 struct ost_body *body;
220 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
223 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224 OST_GETATTR, 2, size, NULL);
228 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
229 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
231 ptlrpc_req_set_repsize(req, 2, size);
233 rc = ptlrpc_queue_wait(req);
235 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
239 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
240 lustre_swab_ost_body);
242 CERROR ("can't unpack ost_body\n");
243 GOTO (out, rc = -EPROTO);
246 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
247 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
249 /* This should really be sent by the OST */
250 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
251 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
255 ptlrpc_req_finished(req);
259 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
260 struct obd_trans_info *oti)
262 struct ptlrpc_request *req;
263 struct ost_body *body;
264 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
267 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
268 OST_SETATTR, 2, size, NULL);
272 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
273 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
275 ptlrpc_req_set_repsize(req, 2, size);
277 rc = ptlrpc_queue_wait(req);
281 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
282 lustre_swab_ost_body);
284 GOTO(out, rc = -EPROTO);
286 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
290 ptlrpc_req_finished(req);
294 static int osc_setattr_interpret(struct ptlrpc_request *req,
295 struct osc_async_args *aa, int rc)
297 struct ost_body *body;
303 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
304 lustre_swab_ost_body);
306 CERROR("can't unpack ost_body\n");
307 GOTO(out, rc = -EPROTO);
310 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
312 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
316 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
317 struct obd_trans_info *oti,
318 struct ptlrpc_request_set *rqset)
320 struct ptlrpc_request *req;
321 struct ost_body *body;
322 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
323 struct osc_async_args *aa;
326 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
327 OST_SETATTR, 2, size, NULL);
331 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
333 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
335 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
338 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
339 ptlrpc_req_set_repsize(req, 2, size);
340 /* do mds to ost setattr asynchronouly */
342 /* Do not wait for response. */
343 ptlrpcd_add_req(req);
345 req->rq_interpret_reply = osc_setattr_interpret;
347 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
348 aa = (struct osc_async_args *)&req->rq_async_args;
351 ptlrpc_set_add_req(rqset, req);
357 int osc_real_create(struct obd_export *exp, struct obdo *oa,
358 struct lov_stripe_md **ea, struct obd_trans_info *oti)
360 struct ptlrpc_request *req;
361 struct ost_body *body;
362 struct lov_stripe_md *lsm;
363 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
371 rc = obd_alloc_memmd(exp, &lsm);
376 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
377 OST_CREATE, 2, size, NULL);
379 GOTO(out, rc = -ENOMEM);
381 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
382 memcpy(&body->oa, oa, sizeof(body->oa));
384 ptlrpc_req_set_repsize(req, 2, size);
385 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
386 oa->o_flags == OBD_FL_DELORPHAN) {
388 "delorphan from OST integration");
389 /* Don't resend the delorphan req */
390 req->rq_no_resend = req->rq_no_delay = 1;
393 rc = ptlrpc_queue_wait(req);
397 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398 lustre_swab_ost_body);
400 CERROR ("can't unpack ost_body\n");
401 GOTO (out_req, rc = -EPROTO);
404 memcpy(oa, &body->oa, sizeof(*oa));
406 /* This should really be sent by the OST */
407 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408 oa->o_valid |= OBD_MD_FLBLKSZ;
410 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411 * have valid lsm_oinfo data structs, so don't go touching that.
412 * This needs to be fixed in a big way.
414 lsm->lsm_object_id = oa->o_id;
418 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
420 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421 if (!oti->oti_logcookies)
422 oti_alloc_cookies(oti, 1);
423 *oti->oti_logcookies = oa->o_lcookie;
427 CDEBUG(D_HA, "transno: "LPD64"\n",
428 lustre_msg_get_transno(req->rq_repmsg));
430 ptlrpc_req_finished(req);
433 obd_free_memmd(exp, &lsm);
437 static int osc_punch_interpret(struct ptlrpc_request *req,
438 struct osc_async_args *aa, int rc)
440 struct ost_body *body;
446 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
447 lustre_swab_ost_body);
449 CERROR ("can't unpack ost_body\n");
450 GOTO(out, rc = -EPROTO);
453 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
455 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
459 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
460 struct obd_trans_info *oti,
461 struct ptlrpc_request_set *rqset)
463 struct ptlrpc_request *req;
464 struct osc_async_args *aa;
465 struct ost_body *body;
466 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
474 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
475 OST_PUNCH, 2, size, NULL);
479 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
480 ptlrpc_at_set_req_timeout(req);
482 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
485 /* overload the size and blocks fields in the oa with start/end */
486 body->oa.o_size = oinfo->oi_policy.l_extent.start;
487 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
490 ptlrpc_req_set_repsize(req, 2, size);
492 req->rq_interpret_reply = osc_punch_interpret;
493 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494 aa = (struct osc_async_args *)&req->rq_async_args;
496 ptlrpc_set_add_req(rqset, req);
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502 struct lov_stripe_md *md, obd_size start, obd_size end)
504 struct ptlrpc_request *req;
505 struct ost_body *body;
506 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
514 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515 OST_SYNC, 2, size, NULL);
519 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520 memcpy(&body->oa, oa, sizeof(*oa));
522 /* overload the size and blocks fields in the oa with start/end */
523 body->oa.o_size = start;
524 body->oa.o_blocks = end;
525 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
527 ptlrpc_req_set_repsize(req, 2, size);
529 rc = ptlrpc_queue_wait(req);
533 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534 lustre_swab_ost_body);
536 CERROR ("can't unpack ost_body\n");
537 GOTO (out, rc = -EPROTO);
540 memcpy(oa, &body->oa, sizeof(*oa));
544 ptlrpc_req_finished(req);
548 /* Find and cancel locally locks matched by @mode in the resource found by
549 * @objid. Found locks are added into @cancel list. Returns the amount of
550 * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552 struct list_head *cancels, ldlm_mode_t mode,
555 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556 struct ldlm_res_id res_id = { .name = { objid } };
557 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
564 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565 lock_flags, 0, NULL);
566 ldlm_resource_putref(res);
570 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
573 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
575 atomic_dec(&cli->cl_destroy_in_flight);
576 cfs_waitq_signal(&cli->cl_destroy_waitq);
580 static int osc_can_send_destroy(struct client_obd *cli)
582 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
583 cli->cl_max_rpcs_in_flight) {
584 /* The destroy request can be sent */
587 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
588 cli->cl_max_rpcs_in_flight) {
590 * The counter has been modified between the two atomic
593 cfs_waitq_signal(&cli->cl_destroy_waitq);
598 /* Destroy requests can be async always on the client, and we don't even really
599 * care about the return code since the client cannot do anything at all about
601 * When the MDS is unlinking a filename, it saves the file objects into a
602 * recovery llog, and these object records are cancelled when the OST reports
603 * they were destroyed and sync'd to disk (i.e. transaction committed).
604 * If the client dies, or the OST is down when the object should be destroyed,
605 * the records are not cancelled, and when the OST reconnects to the MDS next,
606 * it will retrieve the llog unlink logs and then sends the log cancellation
607 * cookies to the MDS after committing destroy transactions. */
608 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
609 struct lov_stripe_md *ea, struct obd_trans_info *oti,
610 struct obd_export *md_export)
612 CFS_LIST_HEAD(cancels);
613 struct ptlrpc_request *req;
614 struct ost_body *body;
615 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
616 sizeof(struct ldlm_request) };
617 int count, bufcount = 2;
618 struct client_obd *cli = &exp->exp_obd->u.cli;
626 LASSERT(oa->o_id != 0);
628 count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
629 LDLM_FL_DISCARD_DATA);
630 if (exp_connect_cancelset(exp))
632 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
633 size, REQ_REC_OFF + 1, 0, &cancels, count);
637 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638 req->rq_interpret_reply = osc_destroy_interpret;
639 ptlrpc_at_set_req_timeout(req);
641 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
644 oa->o_lcookie = *oti->oti_logcookies;
647 memcpy(&body->oa, oa, sizeof(*oa));
648 ptlrpc_req_set_repsize(req, 2, size);
650 if (!osc_can_send_destroy(cli)) {
651 struct l_wait_info lwi = { 0 };
654 * Wait until the number of on-going destroy RPCs drops
655 * under max_rpc_in_flight
657 l_wait_event_exclusive(cli->cl_destroy_waitq,
658 osc_can_send_destroy(cli), &lwi);
661 /* Do not wait for response */
662 ptlrpcd_add_req(req);
666 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
669 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
671 LASSERT(!(oa->o_valid & bits));
674 client_obd_list_lock(&cli->cl_loi_list_lock);
675 oa->o_dirty = cli->cl_dirty;
676 if (cli->cl_dirty > cli->cl_dirty_max) {
677 CERROR("dirty %lu > dirty_max %lu\n",
678 cli->cl_dirty, cli->cl_dirty_max);
680 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
681 CERROR("dirty %d > system dirty_max %d\n",
682 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
684 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
685 CERROR("dirty %lu - dirty_max %lu too big???\n",
686 cli->cl_dirty, cli->cl_dirty_max);
689 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
690 (cli->cl_max_rpcs_in_flight + 1);
691 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
693 oa->o_grant = cli->cl_avail_grant;
694 oa->o_dropped = cli->cl_lost_grant;
695 cli->cl_lost_grant = 0;
696 client_obd_list_unlock(&cli->cl_loi_list_lock);
697 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
698 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
701 /* caller must hold loi_list_lock */
702 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
704 atomic_inc(&obd_dirty_pages);
705 cli->cl_dirty += CFS_PAGE_SIZE;
706 cli->cl_avail_grant -= CFS_PAGE_SIZE;
707 pga->flag |= OBD_BRW_FROM_GRANT;
708 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
709 CFS_PAGE_SIZE, pga, pga->pg);
710 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
711 cli->cl_avail_grant);
714 /* the companion to osc_consume_write_grant, called when a brw has completed.
715 * must be called with the loi lock held. */
716 static void osc_release_write_grant(struct client_obd *cli,
717 struct brw_page *pga, int sent)
719 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
722 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
727 pga->flag &= ~OBD_BRW_FROM_GRANT;
728 atomic_dec(&obd_dirty_pages);
729 cli->cl_dirty -= CFS_PAGE_SIZE;
731 cli->cl_lost_grant += CFS_PAGE_SIZE;
732 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
733 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
734 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
735 /* For short writes we shouldn't count parts of pages that
736 * span a whole block on the OST side, or our accounting goes
737 * wrong. Should match the code in filter_grant_check. */
738 int offset = pga->off & ~CFS_PAGE_MASK;
739 int count = pga->count + (offset & (blocksize - 1));
740 int end = (offset + pga->count) & (blocksize - 1);
742 count += blocksize - end;
744 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
745 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
746 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
747 cli->cl_avail_grant, cli->cl_dirty);
753 static unsigned long rpcs_in_flight(struct client_obd *cli)
755 return cli->cl_r_in_flight + cli->cl_w_in_flight;
758 /* caller must hold loi_list_lock */
759 void osc_wake_cache_waiters(struct client_obd *cli)
761 struct list_head *l, *tmp;
762 struct osc_cache_waiter *ocw;
765 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
766 /* if we can't dirty more, we must wait until some is written */
767 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
768 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
769 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
770 "osc max %ld, sys max %d\n", cli->cl_dirty,
771 cli->cl_dirty_max, obd_max_dirty_pages);
775 /* if still dirty cache but no grant wait for pending RPCs that
776 * may yet return us some grant before doing sync writes */
777 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
778 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
779 cli->cl_w_in_flight);
783 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
784 list_del_init(&ocw->ocw_entry);
785 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
786 /* no more RPCs in flight to return grant, do sync IO */
787 ocw->ocw_rc = -EDQUOT;
788 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
790 osc_consume_write_grant(cli,
791 &ocw->ocw_oap->oap_brw_page);
794 cfs_waitq_signal(&ocw->ocw_waitq);
800 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
802 client_obd_list_lock(&cli->cl_loi_list_lock);
803 cli->cl_avail_grant = ocd->ocd_grant;
804 client_obd_list_unlock(&cli->cl_loi_list_lock);
806 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
807 cli->cl_avail_grant, cli->cl_lost_grant);
808 LASSERT(cli->cl_avail_grant >= 0);
811 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
813 client_obd_list_lock(&cli->cl_loi_list_lock);
814 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
815 if (body->oa.o_valid & OBD_MD_FLGRANT)
816 cli->cl_avail_grant += body->oa.o_grant;
817 /* waiters are woken in brw_interpret_oap */
818 client_obd_list_unlock(&cli->cl_loi_list_lock);
821 /* We assume that the reason this OSC got a short read is because it read
822 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
823 * via the LOV, and it _knows_ it's reading inside the file, it's just that
824 * this stripe never got written at or beyond this stripe offset yet. */
825 static void handle_short_read(int nob_read, obd_count page_count,
826 struct brw_page **pga)
831 /* skip bytes read OK */
832 while (nob_read > 0) {
833 LASSERT (page_count > 0);
835 if (pga[i]->count > nob_read) {
836 /* EOF inside this page */
837 ptr = cfs_kmap(pga[i]->pg) +
838 (pga[i]->off & ~CFS_PAGE_MASK);
839 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
840 cfs_kunmap(pga[i]->pg);
846 nob_read -= pga[i]->count;
851 /* zero remaining pages */
852 while (page_count-- > 0) {
853 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
854 memset(ptr, 0, pga[i]->count);
855 cfs_kunmap(pga[i]->pg);
860 static int check_write_rcs(struct ptlrpc_request *req,
861 int requested_nob, int niocount,
862 obd_count page_count, struct brw_page **pga)
866 /* return error if any niobuf was in error */
867 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
868 sizeof(*remote_rcs) * niocount, NULL);
869 if (remote_rcs == NULL) {
870 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
873 if (lustre_rep_need_swab(req))
874 for (i = 0; i < niocount; i++)
875 __swab32s(&remote_rcs[i]);
877 for (i = 0; i < niocount; i++) {
878 if (remote_rcs[i] < 0)
879 return(remote_rcs[i]);
881 if (remote_rcs[i] != 0) {
882 CERROR("rc[%d] invalid (%d) req %p\n",
883 i, remote_rcs[i], req);
888 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
889 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
890 requested_nob, req->rq_bulk->bd_nob_transferred);
897 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
899 if (p1->flag != p2->flag) {
900 unsigned mask = ~OBD_BRW_FROM_GRANT;
902 /* warn if we try to combine flags that we don't know to be
904 if ((p1->flag & mask) != (p2->flag & mask))
905 CERROR("is it ok to have flags 0x%x and 0x%x in the "
906 "same brw?\n", p1->flag, p2->flag);
910 return (p1->off + p1->count == p2->off);
913 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
914 struct brw_page **pga, int opc,
915 cksum_type_t cksum_type)
920 LASSERT (pg_count > 0);
921 cksum = init_checksum(cksum_type);
922 while (nob > 0 && pg_count > 0) {
923 unsigned char *ptr = cfs_kmap(pga[i]->pg);
924 int off = pga[i]->off & ~CFS_PAGE_MASK;
925 int count = pga[i]->count > nob ? nob : pga[i]->count;
927 /* corrupt the data before we compute the checksum, to
928 * simulate an OST->client data error */
929 if (i == 0 && opc == OST_READ &&
930 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
931 memcpy(ptr + off, "bad1", min(4, nob));
932 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
933 cfs_kunmap(pga[i]->pg);
934 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
937 nob -= pga[i]->count;
941 /* For sending we only compute the wrong checksum instead
942 * of corrupting the data so it is still correct on a redo */
943 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
949 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
950 struct lov_stripe_md *lsm, obd_count page_count,
951 struct brw_page **pga,
952 struct ptlrpc_request **reqp)
954 struct ptlrpc_request *req;
955 struct ptlrpc_bulk_desc *desc;
956 struct ost_body *body;
957 struct obd_ioobj *ioobj;
958 struct niobuf_remote *niobuf;
959 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
960 int niocount, i, requested_nob, opc, rc;
961 struct ptlrpc_request_pool *pool;
962 struct osc_brw_async_args *aa;
963 struct brw_page *pg_prev;
966 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
967 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
969 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
970 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
972 for (niocount = i = 1; i < page_count; i++) {
973 if (!can_merge_pages(pga[i - 1], pga[i]))
977 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
978 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
980 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
985 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
986 ptlrpc_at_set_req_timeout(req);
988 if (opc == OST_WRITE)
989 desc = ptlrpc_prep_bulk_imp (req, page_count,
990 BULK_GET_SOURCE, OST_BULK_PORTAL);
992 desc = ptlrpc_prep_bulk_imp (req, page_count,
993 BULK_PUT_SINK, OST_BULK_PORTAL);
995 GOTO(out, rc = -ENOMEM);
996 /* NB request now owns desc and will free it when it gets freed */
998 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
999 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1000 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1001 niocount * sizeof(*niobuf));
1003 memcpy(&body->oa, oa, sizeof(*oa));
1005 obdo_to_ioobj(oa, ioobj);
1006 ioobj->ioo_bufcnt = niocount;
1008 LASSERT (page_count > 0);
1010 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1011 struct brw_page *pg = pga[i];
1013 LASSERT(pg->count > 0);
1014 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1015 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1016 pg->off, pg->count);
1018 LASSERTF(i == 0 || pg->off > pg_prev->off,
1019 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1020 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1022 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1023 pg_prev->pg, page_private(pg_prev->pg),
1024 pg_prev->pg->index, pg_prev->off);
1026 LASSERTF(i == 0 || pg->off > pg_prev->off,
1027 "i %d p_c %u\n", i, page_count);
1029 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1030 (pg->flag & OBD_BRW_SRVLOCK));
1032 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1034 requested_nob += pg->count;
1036 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1038 niobuf->len += pg->count;
1040 niobuf->offset = pg->off;
1041 niobuf->len = pg->count;
1042 niobuf->flags = pg->flag;
1047 LASSERTF((void *)(niobuf - niocount) ==
1048 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1049 niocount * sizeof(*niobuf)),
1050 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1051 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1052 (void *)(niobuf - niocount));
1054 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1056 /* size[REQ_REC_OFF] still sizeof (*body) */
1057 if (opc == OST_WRITE) {
1058 if (cli->cl_checksum) {
1059 /* store cl_cksum_type in a local variable since
1060 * it can be changed via lprocfs */
1061 cksum_type_t cksum_type = cli->cl_cksum_type;
1063 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1064 oa->o_flags = body->oa.o_flags = 0;
1065 body->oa.o_flags |= cksum_type_pack(cksum_type);
1066 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1067 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1071 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1073 /* save this in 'oa', too, for later checking */
1074 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1075 oa->o_flags |= cksum_type_pack(cksum_type);
1077 /* clear out the checksum flag, in case this is a
1078 * resend but cl_checksum is no longer set. b=11238 */
1079 oa->o_valid &= ~OBD_MD_FLCKSUM;
1081 oa->o_cksum = body->oa.o_cksum;
1082 /* 1 RC per niobuf */
1083 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1084 ptlrpc_req_set_repsize(req, 3, size);
1086 if (cli->cl_checksum) {
1087 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1088 body->oa.o_flags = 0;
1089 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1090 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1092 /* 1 RC for the whole I/O */
1093 ptlrpc_req_set_repsize(req, 2, size);
1096 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1097 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1099 aa->aa_requested_nob = requested_nob;
1100 aa->aa_nio_count = niocount;
1101 aa->aa_page_count = page_count;
1105 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1111 ptlrpc_req_finished (req);
1115 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1116 __u32 client_cksum, __u32 server_cksum, int nob,
1117 obd_count page_count, struct brw_page **pga,
1118 cksum_type_t client_cksum_type)
1122 cksum_type_t cksum_type;
1124 if (server_cksum == client_cksum) {
1125 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1129 if (oa->o_valid & OBD_MD_FLFLAGS)
1130 cksum_type = cksum_type_unpack(oa->o_flags);
1132 cksum_type = OBD_CKSUM_CRC32;
1134 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1137 if (cksum_type != client_cksum_type)
1138 msg = "the server did not use the checksum type specified in "
1139 "the original request - likely a protocol problem";
1140 else if (new_cksum == server_cksum)
1141 msg = "changed on the client after we checksummed it - "
1142 "likely false positive due to mmap IO (bug 11742)";
1143 else if (new_cksum == client_cksum)
1144 msg = "changed in transit before arrival at OST";
1146 msg = "changed in transit AND doesn't match the original - "
1147 "likely false positive due to mmap IO (bug 11742)";
1149 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1150 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1151 "["LPU64"-"LPU64"]\n",
1152 msg, libcfs_nid2str(peer->nid),
1153 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1154 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1157 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1159 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1160 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1161 "client csum now %x\n", client_cksum, client_cksum_type,
1162 server_cksum, cksum_type, new_cksum);
1167 /* Note rc enters this function as number of bytes transferred */
1168 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1170 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1171 const lnet_process_id_t *peer =
1172 &req->rq_import->imp_connection->c_peer;
1173 struct client_obd *cli = aa->aa_cli;
1174 struct ost_body *body;
1175 __u32 client_cksum = 0;
1178 if (rc < 0 && rc != -EDQUOT)
1181 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1182 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1183 lustre_swab_ost_body);
1185 CERROR ("Can't unpack body\n");
1189 /* set/clear over quota flag for a uid/gid */
1190 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1191 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1192 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1193 body->oa.o_gid, body->oa.o_valid,
1199 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1200 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1202 osc_update_grant(cli, body);
1204 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1206 CERROR ("Unexpected +ve rc %d\n", rc);
1209 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1211 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1212 check_write_checksum(&body->oa, peer, client_cksum,
1213 body->oa.o_cksum, aa->aa_requested_nob,
1214 aa->aa_page_count, aa->aa_ppga,
1215 cksum_type_unpack(aa->aa_oa->o_flags)))
1218 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1219 aa->aa_page_count, aa->aa_ppga);
1223 /* The rest of this function executes only for OST_READs */
1224 if (rc > aa->aa_requested_nob) {
1225 CERROR("Unexpected rc %d (%d requested)\n", rc,
1226 aa->aa_requested_nob);
1230 if (rc != req->rq_bulk->bd_nob_transferred) {
1231 CERROR ("Unexpected rc %d (%d transferred)\n",
1232 rc, req->rq_bulk->bd_nob_transferred);
1236 if (rc < aa->aa_requested_nob)
1237 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1239 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1240 static int cksum_counter;
1241 __u32 server_cksum = body->oa.o_cksum;
1244 cksum_type_t cksum_type;
1246 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1247 cksum_type = cksum_type_unpack(body->oa.o_flags);
1249 cksum_type = OBD_CKSUM_CRC32;
1250 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1251 aa->aa_ppga, OST_READ,
1254 if (peer->nid == req->rq_bulk->bd_sender) {
1258 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1261 if (server_cksum == ~0 && rc > 0) {
1262 CERROR("Protocol error: server %s set the 'checksum' "
1263 "bit, but didn't send a checksum. Not fatal, "
1264 "but please tell CFS.\n",
1265 libcfs_nid2str(peer->nid));
1266 } else if (server_cksum != client_cksum) {
1267 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1268 "%s%s%s inum "LPU64"/"LPU64" object "
1269 LPU64"/"LPU64" extent "
1270 "["LPU64"-"LPU64"]\n",
1271 req->rq_import->imp_obd->obd_name,
1272 libcfs_nid2str(peer->nid),
1274 body->oa.o_valid & OBD_MD_FLFID ?
1275 body->oa.o_fid : (__u64)0,
1276 body->oa.o_valid & OBD_MD_FLFID ?
1277 body->oa.o_generation :(__u64)0,
1279 body->oa.o_valid & OBD_MD_FLGROUP ?
1280 body->oa.o_gr : (__u64)0,
1281 aa->aa_ppga[0]->off,
1282 aa->aa_ppga[aa->aa_page_count-1]->off +
1283 aa->aa_ppga[aa->aa_page_count-1]->count -
1285 CERROR("client %x, server %x, cksum_type %x\n",
1286 client_cksum, server_cksum, cksum_type);
1288 aa->aa_oa->o_cksum = client_cksum;
1292 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1295 } else if (unlikely(client_cksum)) {
1296 static int cksum_missed;
1299 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1300 CERROR("Checksum %u requested from %s but not sent\n",
1301 cksum_missed, libcfs_nid2str(peer->nid));
1307 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1312 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1313 struct lov_stripe_md *lsm,
1314 obd_count page_count, struct brw_page **pga)
1316 struct ptlrpc_request *request;
1320 struct l_wait_info lwi;
1323 init_waitqueue_head(&waitq);
1326 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1327 page_count, pga, &request);
1331 rc = ptlrpc_queue_wait(request);
1333 if (rc == -ETIMEDOUT && request->rq_resend) {
1334 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1335 ptlrpc_req_finished(request);
1339 rc = osc_brw_fini_request(request, rc);
1341 ptlrpc_req_finished(request);
1342 if (osc_recoverable_error(rc)) {
1344 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1345 CERROR("too many resend retries, returning error\n");
1349 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1350 l_wait_event(waitq, 0, &lwi);
1357 int osc_brw_redo_request(struct ptlrpc_request *request,
1358 struct osc_brw_async_args *aa)
1360 struct ptlrpc_request *new_req;
1361 struct ptlrpc_request_set *set = request->rq_set;
1362 struct osc_brw_async_args *new_aa;
1363 struct osc_async_page *oap;
1367 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1368 CERROR("too many resend retries, returning error\n");
1372 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1374 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1375 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1376 aa->aa_cli, aa->aa_oa,
1377 NULL /* lsm unused by osc currently */,
1378 aa->aa_page_count, aa->aa_ppga, &new_req);
1382 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1384 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1385 if (oap->oap_request != NULL) {
1386 LASSERTF(request == oap->oap_request,
1387 "request %p != oap_request %p\n",
1388 request, oap->oap_request);
1389 if (oap->oap_interrupted) {
1390 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1391 ptlrpc_req_finished(new_req);
1396 /* New request takes over pga and oaps from old request.
1397 * Note that copying a list_head doesn't work, need to move it... */
1399 new_req->rq_interpret_reply = request->rq_interpret_reply;
1400 new_req->rq_async_args = request->rq_async_args;
1401 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1403 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1405 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1406 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1407 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1409 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1410 if (oap->oap_request) {
1411 ptlrpc_req_finished(oap->oap_request);
1412 oap->oap_request = ptlrpc_request_addref(new_req);
1416 /* use ptlrpc_set_add_req is safe because interpret functions work
1417 * in check_set context. only one way exist with access to request
1418 * from different thread got -EINTR - this way protected with
1419 * cl_loi_list_lock */
1420 ptlrpc_set_add_req(set, new_req);
1422 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1424 DEBUG_REQ(D_INFO, new_req, "new request");
1428 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1430 struct osc_brw_async_args *aa = data;
1434 rc = osc_brw_fini_request(request, rc);
1435 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1436 if (osc_recoverable_error(rc)) {
1437 rc = osc_brw_redo_request(request, aa);
1441 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1442 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1443 aa->aa_cli->cl_w_in_flight--;
1445 aa->aa_cli->cl_r_in_flight--;
1447 for (i = 0; i < aa->aa_page_count; i++)
1448 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1449 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1450 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1455 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1456 struct lov_stripe_md *lsm, obd_count page_count,
1457 struct brw_page **pga, struct ptlrpc_request_set *set)
1459 struct ptlrpc_request *request;
1460 struct client_obd *cli = &exp->exp_obd->u.cli;
1462 struct osc_brw_async_args *aa;
1465 /* Consume write credits even if doing a sync write -
1466 * otherwise we may run out of space on OST due to grant. */
1467 if (cmd == OBD_BRW_WRITE) {
1468 client_obd_list_lock(&cli->cl_loi_list_lock);
1469 for (i = 0; i < page_count; i++) {
1470 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1471 osc_consume_write_grant(cli, pga[i]);
1473 client_obd_list_unlock(&cli->cl_loi_list_lock);
1476 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1477 page_count, pga, &request);
1479 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1480 if (cmd == OBD_BRW_READ) {
1481 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1482 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1483 ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
1485 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1486 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1487 cli->cl_w_in_flight);
1488 ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
1492 request->rq_interpret_reply = brw_interpret;
1493 ptlrpc_set_add_req(set, request);
1494 client_obd_list_lock(&cli->cl_loi_list_lock);
1495 if (cmd == OBD_BRW_READ)
1496 cli->cl_r_in_flight++;
1498 cli->cl_w_in_flight++;
1499 client_obd_list_unlock(&cli->cl_loi_list_lock);
1500 } else if (cmd == OBD_BRW_WRITE) {
1501 client_obd_list_lock(&cli->cl_loi_list_lock);
1502 for (i = 0; i < page_count; i++)
1503 osc_release_write_grant(cli, pga[i], 0);
1504 client_obd_list_unlock(&cli->cl_loi_list_lock);
1511 * ugh, we want disk allocation on the target to happen in offset order. we'll
1512 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1513 * fine for our small page arrays and doesn't require allocation. its an
1514 * insertion sort that swaps elements that are strides apart, shrinking the
1515 * stride down until its '1' and the array is sorted.
1517 static void sort_brw_pages(struct brw_page **array, int num)
1520 struct brw_page *tmp;
1524 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1529 for (i = stride ; i < num ; i++) {
1532 while (j >= stride && array[j-stride]->off > tmp->off) {
1533 array[j] = array[j - stride];
1538 } while (stride > 1);
1541 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1547 LASSERT (pages > 0);
1548 offset = pg[i]->off & (~CFS_PAGE_MASK);
1552 if (pages == 0) /* that's all */
1555 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1556 return count; /* doesn't end on page boundary */
1559 offset = pg[i]->off & (~CFS_PAGE_MASK);
1560 if (offset != 0) /* doesn't start on page boundary */
1567 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1569 struct brw_page **ppga;
1572 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1576 for (i = 0; i < count; i++)
1581 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1583 LASSERT(ppga != NULL);
1584 OBD_FREE(ppga, sizeof(*ppga) * count);
1587 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1588 obd_count page_count, struct brw_page *pga,
1589 struct obd_trans_info *oti)
1591 struct obdo *saved_oa = NULL;
1592 struct brw_page **ppga, **orig;
1593 struct obd_import *imp = class_exp2cliimp(exp);
1594 struct client_obd *cli = &imp->imp_obd->u.cli;
1595 int rc, page_count_orig;
1598 if (cmd & OBD_BRW_CHECK) {
1599 /* The caller just wants to know if there's a chance that this
1600 * I/O can succeed */
1602 if (imp == NULL || imp->imp_invalid)
1607 /* test_brw with a failed create can trip this, maybe others. */
1608 LASSERT(cli->cl_max_pages_per_rpc);
1612 orig = ppga = osc_build_ppga(pga, page_count);
1615 page_count_orig = page_count;
1617 sort_brw_pages(ppga, page_count);
1618 while (page_count) {
1619 obd_count pages_per_brw;
1621 if (page_count > cli->cl_max_pages_per_rpc)
1622 pages_per_brw = cli->cl_max_pages_per_rpc;
1624 pages_per_brw = page_count;
1626 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1628 if (saved_oa != NULL) {
1629 /* restore previously saved oa */
1630 *oinfo->oi_oa = *saved_oa;
1631 } else if (page_count > pages_per_brw) {
1632 /* save a copy of oa (brw will clobber it) */
1633 OBDO_ALLOC(saved_oa);
1634 if (saved_oa == NULL)
1635 GOTO(out, rc = -ENOMEM);
1636 *saved_oa = *oinfo->oi_oa;
1639 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1640 pages_per_brw, ppga);
1645 page_count -= pages_per_brw;
1646 ppga += pages_per_brw;
1650 osc_release_ppga(orig, page_count_orig);
1652 if (saved_oa != NULL)
1653 OBDO_FREE(saved_oa);
1658 static int osc_brw_async(int cmd, struct obd_export *exp,
1659 struct obd_info *oinfo, obd_count page_count,
1660 struct brw_page *pga, struct obd_trans_info *oti,
1661 struct ptlrpc_request_set *set)
1663 struct brw_page **ppga, **orig;
1664 int page_count_orig;
1668 if (cmd & OBD_BRW_CHECK) {
1669 /* The caller just wants to know if there's a chance that this
1670 * I/O can succeed */
1671 struct obd_import *imp = class_exp2cliimp(exp);
1673 if (imp == NULL || imp->imp_invalid)
1678 orig = ppga = osc_build_ppga(pga, page_count);
1681 page_count_orig = page_count;
1683 sort_brw_pages(ppga, page_count);
1684 while (page_count) {
1685 struct brw_page **copy;
1686 obd_count pages_per_brw;
1688 pages_per_brw = min_t(obd_count, page_count,
1689 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1691 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1693 /* use ppga only if single RPC is going to fly */
1694 if (pages_per_brw != page_count_orig || ppga != orig) {
1695 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1697 GOTO(out, rc = -ENOMEM);
1698 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1702 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1703 pages_per_brw, copy, set);
1707 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1712 /* we passed it to async_internal() which is
1713 * now responsible for releasing memory */
1717 page_count -= pages_per_brw;
1718 ppga += pages_per_brw;
1722 osc_release_ppga(orig, page_count_orig);
1726 static void osc_check_rpcs(struct client_obd *cli);
1728 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1729 * the dirty accounting. Writeback completes or truncate happens before
1730 * writing starts. Must be called with the loi lock held. */
1731 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1734 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1737 /* This maintains the lists of pending pages to read/write for a given object
1738 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1739 * to quickly find objects that are ready to send an RPC. */
1740 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1746 if (lop->lop_num_pending == 0)
1749 /* if we have an invalid import we want to drain the queued pages
1750 * by forcing them through rpcs that immediately fail and complete
1751 * the pages. recovery relies on this to empty the queued pages
1752 * before canceling the locks and evicting down the llite pages */
1753 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1756 /* stream rpcs in queue order as long as as there is an urgent page
1757 * queued. this is our cheap solution for good batching in the case
1758 * where writepage marks some random page in the middle of the file
1759 * as urgent because of, say, memory pressure */
1760 if (!list_empty(&lop->lop_urgent)) {
1761 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1765 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1766 optimal = cli->cl_max_pages_per_rpc;
1767 if (cmd & OBD_BRW_WRITE) {
1768 /* trigger a write rpc stream as long as there are dirtiers
1769 * waiting for space. as they're waiting, they're not going to
1770 * create more pages to coallesce with what's waiting.. */
1771 if (!list_empty(&cli->cl_cache_waiters)) {
1772 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1776 /* +16 to avoid triggering rpcs that would want to include pages
1777 * that are being queued but which can't be made ready until
1778 * the queuer finishes with the page. this is a wart for
1779 * llite::commit_write() */
1782 if (lop->lop_num_pending >= optimal)
1788 static void on_list(struct list_head *item, struct list_head *list,
1791 if (list_empty(item) && should_be_on)
1792 list_add_tail(item, list);
1793 else if (!list_empty(item) && !should_be_on)
1794 list_del_init(item);
1797 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1798 * can find pages to build into rpcs quickly */
1799 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1801 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1802 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1803 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1805 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1806 loi->loi_write_lop.lop_num_pending);
1808 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1809 loi->loi_read_lop.lop_num_pending);
1812 static void lop_update_pending(struct client_obd *cli,
1813 struct loi_oap_pages *lop, int cmd, int delta)
1815 lop->lop_num_pending += delta;
1816 if (cmd & OBD_BRW_WRITE)
1817 cli->cl_pending_w_pages += delta;
1819 cli->cl_pending_r_pages += delta;
1822 /* this is called when a sync waiter receives an interruption. Its job is to
1823 * get the caller woken as soon as possible. If its page hasn't been put in an
1824 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1825 * desiring interruption which will forcefully complete the rpc once the rpc
1827 static void osc_occ_interrupted(struct oig_callback_context *occ)
1829 struct osc_async_page *oap;
1830 struct loi_oap_pages *lop;
1831 struct lov_oinfo *loi;
1834 /* XXX member_of() */
1835 oap = list_entry(occ, struct osc_async_page, oap_occ);
1837 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1839 oap->oap_interrupted = 1;
1841 /* ok, it's been put in an rpc. only one oap gets a request reference */
1842 if (oap->oap_request != NULL) {
1843 ptlrpc_mark_interrupted(oap->oap_request);
1844 ptlrpcd_wake(oap->oap_request);
1848 /* we don't get interruption callbacks until osc_trigger_group_io()
1849 * has been called and put the sync oaps in the pending/urgent lists.*/
1850 if (!list_empty(&oap->oap_pending_item)) {
1851 list_del_init(&oap->oap_pending_item);
1852 list_del_init(&oap->oap_urgent_item);
1855 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1856 &loi->loi_write_lop : &loi->loi_read_lop;
1857 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1858 loi_list_maint(oap->oap_cli, oap->oap_loi);
1860 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1861 oap->oap_oig = NULL;
1865 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1868 /* this is trying to propogate async writeback errors back up to the
1869 * application. As an async write fails we record the error code for later if
1870 * the app does an fsync. As long as errors persist we force future rpcs to be
1871 * sync so that the app can get a sync error and break the cycle of queueing
1872 * pages for which writeback will fail. */
1873 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1880 ar->ar_force_sync = 1;
1881 ar->ar_min_xid = ptlrpc_sample_next_xid();
1886 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1887 ar->ar_force_sync = 0;
1890 static void osc_oap_to_pending(struct osc_async_page *oap)
1892 struct loi_oap_pages *lop;
1894 if (oap->oap_cmd & OBD_BRW_WRITE)
1895 lop = &oap->oap_loi->loi_write_lop;
1897 lop = &oap->oap_loi->loi_read_lop;
1899 if (oap->oap_async_flags & ASYNC_URGENT)
1900 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1901 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1902 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1905 /* this must be called holding the loi list lock to give coverage to exit_cache,
1906 * async_flag maintenance, and oap_request */
1907 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1908 struct osc_async_page *oap, int sent, int rc)
1913 if (oap->oap_request != NULL) {
1914 xid = ptlrpc_req_xid(oap->oap_request);
1915 ptlrpc_req_finished(oap->oap_request);
1916 oap->oap_request = NULL;
1919 oap->oap_async_flags = 0;
1920 oap->oap_interrupted = 0;
1922 if (oap->oap_cmd & OBD_BRW_WRITE) {
1923 osc_process_ar(&cli->cl_ar, xid, rc);
1924 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1927 if (rc == 0 && oa != NULL) {
1928 if (oa->o_valid & OBD_MD_FLBLOCKS)
1929 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1930 if (oa->o_valid & OBD_MD_FLMTIME)
1931 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1932 if (oa->o_valid & OBD_MD_FLATIME)
1933 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1934 if (oa->o_valid & OBD_MD_FLCTIME)
1935 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1939 osc_exit_cache(cli, oap, sent);
1940 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1941 oap->oap_oig = NULL;
1946 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1947 oap->oap_cmd, oa, rc);
1949 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1950 * I/O on the page could start, but OSC calls it under lock
1951 * and thus we can add oap back to pending safely */
1953 /* upper layer wants to leave the page on pending queue */
1954 osc_oap_to_pending(oap);
1956 osc_exit_cache(cli, oap, sent);
1960 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1962 struct osc_brw_async_args *aa = data;
1963 struct osc_async_page *oap, *tmp;
1964 struct client_obd *cli;
1967 rc = osc_brw_fini_request(request, rc);
1968 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1970 if (osc_recoverable_error(rc)) {
1971 rc = osc_brw_redo_request(request, aa);
1977 client_obd_list_lock(&cli->cl_loi_list_lock);
1978 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1979 * is called so we know whether to go to sync BRWs or wait for more
1980 * RPCs to complete */
1981 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1982 cli->cl_w_in_flight--;
1984 cli->cl_r_in_flight--;
1986 /* the caller may re-use the oap after the completion call so
1987 * we need to clean it up a little */
1988 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1989 list_del_init(&oap->oap_rpc_item);
1990 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1993 osc_wake_cache_waiters(cli);
1994 osc_check_rpcs(cli);
1995 client_obd_list_unlock(&cli->cl_loi_list_lock);
1997 OBDO_FREE(aa->aa_oa);
1999 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2003 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2004 struct list_head *rpc_list,
2005 int page_count, int cmd)
2007 struct ptlrpc_request *req;
2008 struct brw_page **pga = NULL;
2009 struct osc_brw_async_args *aa;
2010 struct obdo *oa = NULL;
2011 struct obd_async_page_ops *ops = NULL;
2012 void *caller_data = NULL;
2013 struct osc_async_page *oap;
2014 struct ldlm_lock *lock = NULL;
2018 LASSERT(!list_empty(rpc_list));
2020 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2022 RETURN(ERR_PTR(-ENOMEM));
2026 GOTO(out, req = ERR_PTR(-ENOMEM));
2029 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2031 ops = oap->oap_caller_ops;
2032 caller_data = oap->oap_caller_data;
2033 lock = oap->oap_ldlm_lock;
2035 pga[i] = &oap->oap_brw_page;
2036 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2037 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2038 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2042 /* always get the data for the obdo for the rpc */
2043 LASSERT(ops != NULL);
2044 ops->ap_fill_obdo(caller_data, cmd, oa);
2046 oa->o_handle = lock->l_remote_handle;
2047 oa->o_valid |= OBD_MD_FLHANDLE;
2050 sort_brw_pages(pga, page_count);
2051 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
2053 CERROR("prep_req failed: %d\n", rc);
2054 GOTO(out, req = ERR_PTR(rc));
2057 /* Need to update the timestamps after the request is built in case
2058 * we race with setattr (locally or in queue at OST). If OST gets
2059 * later setattr before earlier BRW (as determined by the request xid),
2060 * the OST will not use BRW timestamps. Sadly, there is no obvious
2061 * way to do this in a single call. bug 10150 */
2062 ops->ap_update_obdo(caller_data, cmd, oa,
2063 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2065 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2066 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2067 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2068 list_splice(rpc_list, &aa->aa_oaps);
2069 CFS_INIT_LIST_HEAD(rpc_list);
2076 OBD_FREE(pga, sizeof(*pga) * page_count);
2081 /* the loi lock is held across this function but it's allowed to release
2082 * and reacquire it during its work */
2083 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2084 int cmd, struct loi_oap_pages *lop)
2086 struct ptlrpc_request *req;
2087 obd_count page_count = 0;
2088 struct osc_async_page *oap = NULL, *tmp;
2089 struct osc_brw_async_args *aa;
2090 struct obd_async_page_ops *ops;
2091 CFS_LIST_HEAD(rpc_list);
2092 unsigned int ending_offset;
2093 unsigned starting_offset = 0;
2097 /* first we find the pages we're allowed to work with */
2098 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2099 ops = oap->oap_caller_ops;
2101 LASSERT(oap->oap_magic == OAP_MAGIC);
2103 if (page_count != 0 &&
2104 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2105 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2106 " oap %p, page %p, srvlock %u\n",
2107 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2110 /* in llite being 'ready' equates to the page being locked
2111 * until completion unlocks it. commit_write submits a page
2112 * as not ready because its unlock will happen unconditionally
2113 * as the call returns. if we race with commit_write giving
2114 * us that page we dont' want to create a hole in the page
2115 * stream, so we stop and leave the rpc to be fired by
2116 * another dirtier or kupdated interval (the not ready page
2117 * will still be on the dirty list). we could call in
2118 * at the end of ll_file_write to process the queue again. */
2119 if (!(oap->oap_async_flags & ASYNC_READY)) {
2120 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2122 CDEBUG(D_INODE, "oap %p page %p returned %d "
2123 "instead of ready\n", oap,
2127 /* llite is telling us that the page is still
2128 * in commit_write and that we should try
2129 * and put it in an rpc again later. we
2130 * break out of the loop so we don't create
2131 * a hole in the sequence of pages in the rpc
2136 /* the io isn't needed.. tell the checks
2137 * below to complete the rpc with EINTR */
2138 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2139 oap->oap_count = -EINTR;
2142 oap->oap_async_flags |= ASYNC_READY;
2145 LASSERTF(0, "oap %p page %p returned %d "
2146 "from make_ready\n", oap,
2154 * Page submitted for IO has to be locked. Either by
2155 * ->ap_make_ready() or by higher layers.
2157 * XXX nikita: this assertion should be adjusted when lustre
2158 * starts using PG_writeback for pages being written out.
2160 #if defined(__KERNEL__) && defined(__linux__)
2161 LASSERT(PageLocked(oap->oap_page));
2163 /* If there is a gap at the start of this page, it can't merge
2164 * with any previous page, so we'll hand the network a
2165 * "fragmented" page array that it can't transfer in 1 RDMA */
2166 if (page_count != 0 && oap->oap_page_off != 0)
2169 /* take the page out of our book-keeping */
2170 list_del_init(&oap->oap_pending_item);
2171 lop_update_pending(cli, lop, cmd, -1);
2172 list_del_init(&oap->oap_urgent_item);
2174 if (page_count == 0)
2175 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2176 (PTLRPC_MAX_BRW_SIZE - 1);
2178 /* ask the caller for the size of the io as the rpc leaves. */
2179 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2181 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2182 if (oap->oap_count <= 0) {
2183 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2185 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2189 /* now put the page back in our accounting */
2190 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2191 if (page_count == 0)
2192 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2193 if (++page_count >= cli->cl_max_pages_per_rpc)
2196 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2197 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2198 * have the same alignment as the initial writes that allocated
2199 * extents on the server. */
2200 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2201 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2202 if (ending_offset == 0)
2205 /* If there is a gap at the end of this page, it can't merge
2206 * with any subsequent pages, so we'll hand the network a
2207 * "fragmented" page array that it can't transfer in 1 RDMA */
2208 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2212 osc_wake_cache_waiters(cli);
2214 if (page_count == 0)
2217 loi_list_maint(cli, loi);
2219 client_obd_list_unlock(&cli->cl_loi_list_lock);
2221 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2223 /* this should happen rarely and is pretty bad, it makes the
2224 * pending list not follow the dirty order */
2225 client_obd_list_lock(&cli->cl_loi_list_lock);
2226 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2227 list_del_init(&oap->oap_rpc_item);
2229 /* queued sync pages can be torn down while the pages
2230 * were between the pending list and the rpc */
2231 if (oap->oap_interrupted) {
2232 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2233 osc_ap_completion(cli, NULL, oap, 0,
2237 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2239 loi_list_maint(cli, loi);
2240 RETURN(PTR_ERR(req));
2243 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2244 if (cmd == OBD_BRW_READ) {
2245 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2246 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2247 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2248 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2249 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2251 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2252 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2253 cli->cl_w_in_flight);
2254 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2255 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2256 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2259 client_obd_list_lock(&cli->cl_loi_list_lock);
2261 if (cmd == OBD_BRW_READ)
2262 cli->cl_r_in_flight++;
2264 cli->cl_w_in_flight++;
2266 /* queued sync pages can be torn down while the pages
2267 * were between the pending list and the rpc */
2269 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2270 /* only one oap gets a request reference */
2273 if (oap->oap_interrupted && !req->rq_intr) {
2274 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2276 ptlrpc_mark_interrupted(req);
2280 tmp->oap_request = ptlrpc_request_addref(req);
2282 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2283 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2285 req->rq_interpret_reply = brw_interpret_oap;
2286 ptlrpcd_add_req(req);
2290 #define LOI_DEBUG(LOI, STR, args...) \
2291 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2292 !list_empty(&(LOI)->loi_cli_item), \
2293 (LOI)->loi_write_lop.lop_num_pending, \
2294 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2295 (LOI)->loi_read_lop.lop_num_pending, \
2296 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2299 /* This is called by osc_check_rpcs() to find which objects have pages that
2300 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2301 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2304 /* first return all objects which we already know to have
2305 * pages ready to be stuffed into rpcs */
2306 if (!list_empty(&cli->cl_loi_ready_list))
2307 RETURN(list_entry(cli->cl_loi_ready_list.next,
2308 struct lov_oinfo, loi_cli_item));
2310 /* then if we have cache waiters, return all objects with queued
2311 * writes. This is especially important when many small files
2312 * have filled up the cache and not been fired into rpcs because
2313 * they don't pass the nr_pending/object threshhold */
2314 if (!list_empty(&cli->cl_cache_waiters) &&
2315 !list_empty(&cli->cl_loi_write_list))
2316 RETURN(list_entry(cli->cl_loi_write_list.next,
2317 struct lov_oinfo, loi_write_item));
2319 /* then return all queued objects when we have an invalid import
2320 * so that they get flushed */
2321 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2322 if (!list_empty(&cli->cl_loi_write_list))
2323 RETURN(list_entry(cli->cl_loi_write_list.next,
2324 struct lov_oinfo, loi_write_item));
2325 if (!list_empty(&cli->cl_loi_read_list))
2326 RETURN(list_entry(cli->cl_loi_read_list.next,
2327 struct lov_oinfo, loi_read_item));
2332 /* called with the loi list lock held */
2333 static void osc_check_rpcs(struct client_obd *cli)
2335 struct lov_oinfo *loi;
2336 int rc = 0, race_counter = 0;
2339 while ((loi = osc_next_loi(cli)) != NULL) {
2340 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2342 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2345 /* attempt some read/write balancing by alternating between
2346 * reads and writes in an object. The makes_rpc checks here
2347 * would be redundant if we were getting read/write work items
2348 * instead of objects. we don't want send_oap_rpc to drain a
2349 * partial read pending queue when we're given this object to
2350 * do io on writes while there are cache waiters */
2351 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2352 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2353 &loi->loi_write_lop);
2361 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2362 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2363 &loi->loi_read_lop);
2372 /* attempt some inter-object balancing by issueing rpcs
2373 * for each object in turn */
2374 if (!list_empty(&loi->loi_cli_item))
2375 list_del_init(&loi->loi_cli_item);
2376 if (!list_empty(&loi->loi_write_item))
2377 list_del_init(&loi->loi_write_item);
2378 if (!list_empty(&loi->loi_read_item))
2379 list_del_init(&loi->loi_read_item);
2381 loi_list_maint(cli, loi);
2383 /* send_oap_rpc fails with 0 when make_ready tells it to
2384 * back off. llite's make_ready does this when it tries
2385 * to lock a page queued for write that is already locked.
2386 * we want to try sending rpcs from many objects, but we
2387 * don't want to spin failing with 0. */
2388 if (race_counter == 10)
2394 /* we're trying to queue a page in the osc so we're subject to the
2395 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2396 * If the osc's queued pages are already at that limit, then we want to sleep
2397 * until there is space in the osc's queue for us. We also may be waiting for
2398 * write credits from the OST if there are RPCs in flight that may return some
2399 * before we fall back to sync writes.
2401 * We need this know our allocation was granted in the presence of signals */
2402 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2406 client_obd_list_lock(&cli->cl_loi_list_lock);
2407 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2408 client_obd_list_unlock(&cli->cl_loi_list_lock);
2412 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2413 * grant or cache space. */
2414 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2415 struct osc_async_page *oap)
2417 struct osc_cache_waiter ocw;
2418 struct l_wait_info lwi = { 0 };
2421 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2422 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2423 cli->cl_dirty_max, obd_max_dirty_pages,
2424 cli->cl_lost_grant, cli->cl_avail_grant);
2426 /* force the caller to try sync io. this can jump the list
2427 * of queued writes and create a discontiguous rpc stream */
2428 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2429 loi->loi_ar.ar_force_sync)
2432 /* Hopefully normal case - cache space and write credits available */
2433 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2434 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2435 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2436 /* account for ourselves */
2437 osc_consume_write_grant(cli, &oap->oap_brw_page);
2441 /* Make sure that there are write rpcs in flight to wait for. This
2442 * is a little silly as this object may not have any pending but
2443 * other objects sure might. */
2444 if (cli->cl_w_in_flight) {
2445 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2446 cfs_waitq_init(&ocw.ocw_waitq);
2450 loi_list_maint(cli, loi);
2451 osc_check_rpcs(cli);
2452 client_obd_list_unlock(&cli->cl_loi_list_lock);
2454 CDEBUG(D_CACHE, "sleeping for cache space\n");
2455 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2457 client_obd_list_lock(&cli->cl_loi_list_lock);
2458 if (!list_empty(&ocw.ocw_entry)) {
2459 list_del(&ocw.ocw_entry);
2468 static int osc_reget_short_lock(struct obd_export *exp,
2469 struct lov_stripe_md *lsm,
2471 loff_t start, loff_t end,
2474 struct osc_async_page *oap = *res;
2479 spin_lock(&oap->oap_lock);
2480 rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2481 start, end, cookie);
2482 spin_unlock(&oap->oap_lock);
2487 static int osc_release_short_lock(struct obd_export *exp,
2488 struct lov_stripe_md *lsm, loff_t end,
2489 void *cookie, int rw)
2492 ldlm_lock_fast_release(cookie, rw);
2493 /* no error could have happened at this layer */
2497 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2498 struct lov_oinfo *loi, cfs_page_t *page,
2499 obd_off offset, struct obd_async_page_ops *ops,
2500 void *data, void **res, int nocache,
2501 struct lustre_handle *lockh)
2503 struct osc_async_page *oap;
2504 struct ldlm_res_id oid = {{0}};
2510 return size_round(sizeof(*oap));
2513 oap->oap_magic = OAP_MAGIC;
2514 oap->oap_cli = &exp->exp_obd->u.cli;
2517 oap->oap_caller_ops = ops;
2518 oap->oap_caller_data = data;
2520 oap->oap_page = page;
2521 oap->oap_obj_off = offset;
2523 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2524 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2525 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2526 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2528 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2530 spin_lock_init(&oap->oap_lock);
2532 /* If the page was marked as notcacheable - don't add to any locks */
2534 oid.name[0] = loi->loi_id;
2535 /* This is the only place where we can call cache_add_extent
2536 without oap_lock, because this page is locked now, and
2537 the lock we are adding it to is referenced, so cannot lose
2538 any pages either. */
2539 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2544 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2548 struct osc_async_page *oap_from_cookie(void *cookie)
2550 struct osc_async_page *oap = cookie;
2551 if (oap->oap_magic != OAP_MAGIC)
2552 return ERR_PTR(-EINVAL);
2556 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2557 struct lov_oinfo *loi, void *cookie,
2558 int cmd, obd_off off, int count,
2559 obd_flag brw_flags, enum async_flags async_flags)
2561 struct client_obd *cli = &exp->exp_obd->u.cli;
2562 struct osc_async_page *oap;
2566 oap = oap_from_cookie(cookie);
2568 RETURN(PTR_ERR(oap));
2570 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2573 if (!list_empty(&oap->oap_pending_item) ||
2574 !list_empty(&oap->oap_urgent_item) ||
2575 !list_empty(&oap->oap_rpc_item))
2578 /* check if the file's owner/group is over quota */
2579 #ifdef HAVE_QUOTA_SUPPORT
2580 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2581 struct obd_async_page_ops *ops;
2588 ops = oap->oap_caller_ops;
2589 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2590 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2601 loi = lsm->lsm_oinfo[0];
2603 client_obd_list_lock(&cli->cl_loi_list_lock);
2606 oap->oap_page_off = off;
2607 oap->oap_count = count;
2608 oap->oap_brw_flags = brw_flags;
2609 oap->oap_async_flags = async_flags;
2611 if (cmd & OBD_BRW_WRITE) {
2612 rc = osc_enter_cache(cli, loi, oap);
2614 client_obd_list_unlock(&cli->cl_loi_list_lock);
2619 osc_oap_to_pending(oap);
2620 loi_list_maint(cli, loi);
2622 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2625 osc_check_rpcs(cli);
2626 client_obd_list_unlock(&cli->cl_loi_list_lock);
2631 /* aka (~was & now & flag), but this is more clear :) */
2632 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2634 static int osc_set_async_flags(struct obd_export *exp,
2635 struct lov_stripe_md *lsm,
2636 struct lov_oinfo *loi, void *cookie,
2637 obd_flag async_flags)
2639 struct client_obd *cli = &exp->exp_obd->u.cli;
2640 struct loi_oap_pages *lop;
2641 struct osc_async_page *oap;
2645 oap = oap_from_cookie(cookie);
2647 RETURN(PTR_ERR(oap));
2650 * bug 7311: OST-side locking is only supported for liblustre for now
2651 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2652 * implementation has to handle case where OST-locked page was picked
2653 * up by, e.g., ->writepage().
2655 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2656 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2659 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2663 loi = lsm->lsm_oinfo[0];
2665 if (oap->oap_cmd & OBD_BRW_WRITE) {
2666 lop = &loi->loi_write_lop;
2668 lop = &loi->loi_read_lop;
2671 client_obd_list_lock(&cli->cl_loi_list_lock);
2673 if (list_empty(&oap->oap_pending_item))
2674 GOTO(out, rc = -EINVAL);
2676 if ((oap->oap_async_flags & async_flags) == async_flags)
2679 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2680 oap->oap_async_flags |= ASYNC_READY;
2682 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2683 if (list_empty(&oap->oap_rpc_item)) {
2684 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2685 loi_list_maint(cli, loi);
2689 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2690 oap->oap_async_flags);
2692 osc_check_rpcs(cli);
2693 client_obd_list_unlock(&cli->cl_loi_list_lock);
2697 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2698 struct lov_oinfo *loi,
2699 struct obd_io_group *oig, void *cookie,
2700 int cmd, obd_off off, int count,
2702 obd_flag async_flags)
2704 struct client_obd *cli = &exp->exp_obd->u.cli;
2705 struct osc_async_page *oap;
2706 struct loi_oap_pages *lop;
2710 oap = oap_from_cookie(cookie);
2712 RETURN(PTR_ERR(oap));
2714 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2717 if (!list_empty(&oap->oap_pending_item) ||
2718 !list_empty(&oap->oap_urgent_item) ||
2719 !list_empty(&oap->oap_rpc_item))
2723 loi = lsm->lsm_oinfo[0];
2725 client_obd_list_lock(&cli->cl_loi_list_lock);
2728 oap->oap_page_off = off;
2729 oap->oap_count = count;
2730 oap->oap_brw_flags = brw_flags;
2731 oap->oap_async_flags = async_flags;
2733 if (cmd & OBD_BRW_WRITE)
2734 lop = &loi->loi_write_lop;
2736 lop = &loi->loi_read_lop;
2738 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2739 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2741 rc = oig_add_one(oig, &oap->oap_occ);
2744 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2745 oap, oap->oap_page, rc);
2747 client_obd_list_unlock(&cli->cl_loi_list_lock);
2752 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2753 struct loi_oap_pages *lop, int cmd)
2755 struct list_head *pos, *tmp;
2756 struct osc_async_page *oap;
2758 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2759 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2760 list_del(&oap->oap_pending_item);
2761 osc_oap_to_pending(oap);
2763 loi_list_maint(cli, loi);
2766 static int osc_trigger_group_io(struct obd_export *exp,
2767 struct lov_stripe_md *lsm,
2768 struct lov_oinfo *loi,
2769 struct obd_io_group *oig)
2771 struct client_obd *cli = &exp->exp_obd->u.cli;
2775 loi = lsm->lsm_oinfo[0];
2777 client_obd_list_lock(&cli->cl_loi_list_lock);
2779 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2780 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2782 osc_check_rpcs(cli);
2783 client_obd_list_unlock(&cli->cl_loi_list_lock);
2788 static int osc_teardown_async_page(struct obd_export *exp,
2789 struct lov_stripe_md *lsm,
2790 struct lov_oinfo *loi, void *cookie)
2792 struct client_obd *cli = &exp->exp_obd->u.cli;
2793 struct loi_oap_pages *lop;
2794 struct osc_async_page *oap;
2798 oap = oap_from_cookie(cookie);
2800 RETURN(PTR_ERR(oap));
2803 loi = lsm->lsm_oinfo[0];
2805 if (oap->oap_cmd & OBD_BRW_WRITE) {
2806 lop = &loi->loi_write_lop;
2808 lop = &loi->loi_read_lop;
2811 client_obd_list_lock(&cli->cl_loi_list_lock);
2813 if (!list_empty(&oap->oap_rpc_item))
2814 GOTO(out, rc = -EBUSY);
2816 osc_exit_cache(cli, oap, 0);
2817 osc_wake_cache_waiters(cli);
2819 if (!list_empty(&oap->oap_urgent_item)) {
2820 list_del_init(&oap->oap_urgent_item);
2821 oap->oap_async_flags &= ~ASYNC_URGENT;
2823 if (!list_empty(&oap->oap_pending_item)) {
2824 list_del_init(&oap->oap_pending_item);
2825 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2827 loi_list_maint(cli, loi);
2828 cache_remove_extent(cli->cl_cache, oap);
2830 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2832 client_obd_list_unlock(&cli->cl_loi_list_lock);
2836 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2837 struct ldlm_lock_desc *new, void *data,
2840 struct lustre_handle lockh = { 0 };
2844 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2845 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2850 case LDLM_CB_BLOCKING:
2851 ldlm_lock2handle(lock, &lockh);
2852 rc = ldlm_cli_cancel(&lockh);
2854 CERROR("ldlm_cli_cancel failed: %d\n", rc);
2856 case LDLM_CB_CANCELING: {
2858 ldlm_lock2handle(lock, &lockh);
2859 /* This lock wasn't granted, don't try to do anything */
2860 if (lock->l_req_mode != lock->l_granted_mode)
2863 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2866 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2867 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2868 lock, new, data,flag);
2877 EXPORT_SYMBOL(osc_extent_blocking_cb);
2879 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2882 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2885 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2888 lock_res_and_lock(lock);
2889 #if defined (__KERNEL__) && defined (__linux__)
2890 /* Liang XXX: Darwin and Winnt checking should be added */
2891 if (lock->l_ast_data && lock->l_ast_data != data) {
2892 struct inode *new_inode = data;
2893 struct inode *old_inode = lock->l_ast_data;
2894 if (!(old_inode->i_state & I_FREEING))
2895 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2896 LASSERTF(old_inode->i_state & I_FREEING,
2897 "Found existing inode %p/%lu/%u state %lu in lock: "
2898 "setting data to %p/%lu/%u\n", old_inode,
2899 old_inode->i_ino, old_inode->i_generation,
2901 new_inode, new_inode->i_ino, new_inode->i_generation);
2904 lock->l_ast_data = data;
2905 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2906 unlock_res_and_lock(lock);
2907 LDLM_LOCK_PUT(lock);
2910 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2911 ldlm_iterator_t replace, void *data)
2913 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2914 struct obd_device *obd = class_exp2obd(exp);
2916 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2920 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
2921 struct obd_info *oinfo, int intent, int rc)
2926 /* The request was created before ldlm_cli_enqueue call. */
2927 if (rc == ELDLM_LOCK_ABORTED) {
2928 struct ldlm_reply *rep;
2930 /* swabbed by ldlm_cli_enqueue() */
2931 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2932 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2934 LASSERT(rep != NULL);
2935 if (rep->lock_policy_res1)
2936 rc = rep->lock_policy_res1;
2940 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2941 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2942 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2943 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2944 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2948 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
2950 /* Call the update callback. */
2951 rc = oinfo->oi_cb_up(oinfo, rc);
2955 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2956 struct osc_enqueue_args *aa, int rc)
2958 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2959 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2960 struct ldlm_lock *lock;
2962 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2964 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2966 /* Complete obtaining the lock procedure. */
2967 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2969 &aa->oa_oi->oi_flags,
2970 &lsm->lsm_oinfo[0]->loi_lvb,
2971 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2972 lustre_swab_ost_lvb,
2973 aa->oa_oi->oi_lockh, rc);
2975 /* Complete osc stuff. */
2976 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
2978 /* Release the lock for async request. */
2979 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2980 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2982 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2983 aa->oa_oi->oi_lockh, req, aa);
2984 LDLM_LOCK_PUT(lock);
2988 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2989 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2990 * other synchronous requests, however keeping some locks and trying to obtain
2991 * others may take a considerable amount of time in a case of ost failure; and
2992 * when other sync requests do not get released lock from a client, the client
2993 * is excluded from the cluster -- such scenarious make the life difficult, so
2994 * release locks just after they are obtained. */
2995 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2996 struct ldlm_enqueue_info *einfo,
2997 struct ptlrpc_request_set *rqset)
2999 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
3000 struct obd_device *obd = exp->exp_obd;
3001 struct ldlm_reply *rep;
3002 struct ptlrpc_request *req = NULL;
3003 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3008 /* Filesystem lock extents are extended to page boundaries so that
3009 * dealing with the page cache is a little smoother. */
3010 oinfo->oi_policy.l_extent.start -=
3011 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3012 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3014 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3017 /* Next, search for already existing extent locks that will cover us */
3018 /* If we're trying to read, we also search for an existing PW lock. The
3019 * VFS and page cache already protect us locally, so lots of readers/
3020 * writers can share a single PW lock.
3022 * There are problems with conversion deadlocks, so instead of
3023 * converting a read lock to a write lock, we'll just enqueue a new
3026 * At some point we should cancel the read lock instead of making them
3027 * send us a blocking callback, but there are problems with canceling
3028 * locks out from other users right now, too. */
3029 mode = einfo->ei_mode;
3030 if (einfo->ei_mode == LCK_PR)
3032 mode = ldlm_lock_match(obd->obd_namespace,
3033 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3034 einfo->ei_type, &oinfo->oi_policy, mode,
3037 /* addref the lock only if not async requests and PW lock is
3038 * matched whereas we asked for PR. */
3039 if (!rqset && einfo->ei_mode != mode)
3040 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3041 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3044 /* I would like to be able to ASSERT here that rss <=
3045 * kms, but I can't, for reasons which are explained in
3049 /* We already have a lock, and it's referenced */
3050 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3052 /* For async requests, decref the lock. */
3053 if (einfo->ei_mode != mode)
3054 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3056 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3064 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3065 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3066 [DLM_LOCKREQ_OFF + 1] = 0 };
3068 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3072 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3073 size[DLM_REPLY_REC_OFF] =
3074 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3075 ptlrpc_req_set_repsize(req, 3, size);
3078 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3079 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3081 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3082 &oinfo->oi_policy, &oinfo->oi_flags,
3083 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3084 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3085 lustre_swab_ost_lvb, oinfo->oi_lockh,
3089 struct osc_enqueue_args *aa;
3090 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3091 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3096 req->rq_interpret_reply = osc_enqueue_interpret;
3097 ptlrpc_set_add_req(rqset, req);
3098 } else if (intent) {
3099 ptlrpc_req_finished(req);
3104 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3106 ptlrpc_req_finished(req);
3111 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3112 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3113 int *flags, void *data, struct lustre_handle *lockh)
3115 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3116 struct obd_device *obd = exp->exp_obd;
3117 int lflags = *flags;
3121 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3123 /* Filesystem lock extents are extended to page boundaries so that
3124 * dealing with the page cache is a little smoother */
3125 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3126 policy->l_extent.end |= ~CFS_PAGE_MASK;
3128 /* Next, search for already existing extent locks that will cover us */
3129 /* If we're trying to read, we also search for an existing PW lock. The
3130 * VFS and page cache already protect us locally, so lots of readers/
3131 * writers can share a single PW lock. */
3135 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3136 &res_id, type, policy, rc, lockh);
3138 osc_set_data_with_check(lockh, data, lflags);
3139 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3140 ldlm_lock_addref(lockh, LCK_PR);
3141 ldlm_lock_decref(lockh, LCK_PW);
3149 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3150 __u32 mode, struct lustre_handle *lockh)
3154 if (unlikely(mode == LCK_GROUP))
3155 ldlm_lock_decref_and_cancel(lockh, mode);
3157 ldlm_lock_decref(lockh, mode);
3162 static int osc_cancel_unused(struct obd_export *exp,
3163 struct lov_stripe_md *lsm, int flags, void *opaque)
3165 struct obd_device *obd = class_exp2obd(exp);
3166 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3168 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
3172 static int osc_join_lru(struct obd_export *exp,
3173 struct lov_stripe_md *lsm, int join)
3175 struct obd_device *obd = class_exp2obd(exp);
3176 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3178 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
3181 static int osc_statfs_interpret(struct ptlrpc_request *req,
3182 struct osc_async_args *aa, int rc)
3184 struct obd_statfs *msfs;
3190 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3191 lustre_swab_obd_statfs);
3193 CERROR("Can't unpack obd_statfs\n");
3194 GOTO(out, rc = -EPROTO);
3197 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3199 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3203 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3204 __u64 max_age, struct ptlrpc_request_set *rqset)
3206 struct ptlrpc_request *req;
3207 struct osc_async_args *aa;
3208 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3211 /* We could possibly pass max_age in the request (as an absolute
3212 * timestamp or a "seconds.usec ago") so the target can avoid doing
3213 * extra calls into the filesystem if that isn't necessary (e.g.
3214 * during mount that would help a bit). Having relative timestamps
3215 * is not so great if request processing is slow, while absolute
3216 * timestamps are not ideal because they need time synchronization. */
3217 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3218 OST_STATFS, 1, NULL, NULL);
3222 ptlrpc_req_set_repsize(req, 2, size);
3223 req->rq_request_portal = OST_CREATE_PORTAL;
3224 ptlrpc_at_set_req_timeout(req);
3225 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3226 /* procfs requests not want stat in wait for avoid deadlock */
3227 req->rq_no_resend = 1;
3228 req->rq_no_delay = 1;
3231 req->rq_interpret_reply = osc_statfs_interpret;
3232 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3233 aa = (struct osc_async_args *)&req->rq_async_args;
3236 ptlrpc_set_add_req(rqset, req);
3240 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3241 __u64 max_age, __u32 flags)
3243 struct obd_statfs *msfs;
3244 struct ptlrpc_request *req;
3245 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3248 /* We could possibly pass max_age in the request (as an absolute
3249 * timestamp or a "seconds.usec ago") so the target can avoid doing
3250 * extra calls into the filesystem if that isn't necessary (e.g.
3251 * during mount that would help a bit). Having relative timestamps
3252 * is not so great if request processing is slow, while absolute
3253 * timestamps are not ideal because they need time synchronization. */
3254 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3255 OST_STATFS, 1, NULL, NULL);
3259 ptlrpc_req_set_repsize(req, 2, size);
3260 req->rq_request_portal = OST_CREATE_PORTAL;
3261 ptlrpc_at_set_req_timeout(req);
3263 if (flags & OBD_STATFS_NODELAY) {
3264 /* procfs requests not want stat in wait for avoid deadlock */
3265 req->rq_no_resend = 1;
3266 req->rq_no_delay = 1;
3269 rc = ptlrpc_queue_wait(req);
3273 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3274 lustre_swab_obd_statfs);
3276 CERROR("Can't unpack obd_statfs\n");
3277 GOTO(out, rc = -EPROTO);
3280 memcpy(osfs, msfs, sizeof(*osfs));
3284 ptlrpc_req_finished(req);
3288 /* Retrieve object striping information.
3290 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3291 * the maximum number of OST indices which will fit in the user buffer.
3292 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3294 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3296 struct lov_user_md lum, *lumk;
3297 int rc = 0, lum_size;
3303 if (copy_from_user(&lum, lump, sizeof(lum)))
3306 if (lum.lmm_magic != LOV_USER_MAGIC)
3309 if (lum.lmm_stripe_count > 0) {
3310 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3311 OBD_ALLOC(lumk, lum_size);
3315 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3317 lum_size = sizeof(lum);
3321 lumk->lmm_object_id = lsm->lsm_object_id;
3322 lumk->lmm_stripe_count = 1;
3324 if (copy_to_user(lump, lumk, lum_size))
3328 OBD_FREE(lumk, lum_size);
3334 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3335 void *karg, void *uarg)
3337 struct obd_device *obd = exp->exp_obd;
3338 struct obd_ioctl_data *data = karg;
3342 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3345 if (!try_module_get(THIS_MODULE)) {
3346 CERROR("Can't get module. Is it alive?");
3351 case OBD_IOC_LOV_GET_CONFIG: {
3353 struct lov_desc *desc;
3354 struct obd_uuid uuid;
3358 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3359 GOTO(out, err = -EINVAL);
3361 data = (struct obd_ioctl_data *)buf;
3363 if (sizeof(*desc) > data->ioc_inllen1) {
3364 obd_ioctl_freedata(buf, len);
3365 GOTO(out, err = -EINVAL);
3368 if (data->ioc_inllen2 < sizeof(uuid)) {
3369 obd_ioctl_freedata(buf, len);
3370 GOTO(out, err = -EINVAL);
3373 desc = (struct lov_desc *)data->ioc_inlbuf1;
3374 desc->ld_tgt_count = 1;
3375 desc->ld_active_tgt_count = 1;
3376 desc->ld_default_stripe_count = 1;
3377 desc->ld_default_stripe_size = 0;
3378 desc->ld_default_stripe_offset = 0;
3379 desc->ld_pattern = 0;
3380 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3382 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3384 err = copy_to_user((void *)uarg, buf, len);
3387 obd_ioctl_freedata(buf, len);
3390 case LL_IOC_LOV_SETSTRIPE:
3391 err = obd_alloc_memmd(exp, karg);
3395 case LL_IOC_LOV_GETSTRIPE:
3396 err = osc_getstripe(karg, uarg);
3398 case OBD_IOC_CLIENT_RECOVER:
3399 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3404 case IOC_OSC_SET_ACTIVE:
3405 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3408 case OBD_IOC_POLL_QUOTACHECK:
3409 err = lquota_poll_check(quota_interface, exp,
3410 (struct if_quotacheck *)karg);
3412 case OBD_IOC_DESTROY: {
3415 if (!capable (CAP_SYS_ADMIN))
3416 GOTO (out, err = -EPERM);
3417 oa = &data->ioc_obdo1;
3420 GOTO(out, err = -EINVAL);
3422 oa->o_valid |= OBD_MD_FLGROUP;
3424 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3428 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3429 cmd, cfs_curproc_comm());
3430 GOTO(out, err = -ENOTTY);
3433 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3436 module_put(THIS_MODULE);
3441 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3442 void *key, __u32 *vallen, void *val)
3445 if (!vallen || !val)
3448 if (KEY_IS("lock_to_stripe")) {
3449 __u32 *stripe = val;
3450 *vallen = sizeof(*stripe);
3453 } else if (KEY_IS("last_id")) {
3454 struct ptlrpc_request *req;
3456 char *bufs[2] = { NULL, key };
3457 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3459 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3460 OST_GET_INFO, 2, size, bufs);
3464 size[REPLY_REC_OFF] = *vallen;
3465 ptlrpc_req_set_repsize(req, 2, size);
3466 rc = ptlrpc_queue_wait(req);
3470 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3471 lustre_swab_ost_last_id);
3472 if (reply == NULL) {
3473 CERROR("Can't unpack OST last ID\n");
3474 GOTO(out, rc = -EPROTO);
3476 *((obd_id *)val) = *reply;
3478 ptlrpc_req_finished(req);
3484 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3487 struct llog_ctxt *ctxt;
3488 struct obd_import *imp = req->rq_import;
3494 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3497 rc = llog_initiator_connect(ctxt);
3499 CERROR("cannot establish connection for "
3500 "ctxt %p: %d\n", ctxt, rc);
3503 llog_ctxt_put(ctxt);
3504 spin_lock(&imp->imp_lock);
3505 imp->imp_server_timeout = 1;
3506 imp->imp_pingable = 1;
3507 spin_unlock(&imp->imp_lock);
3508 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3513 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3514 void *key, obd_count vallen, void *val,
3515 struct ptlrpc_request_set *set)
3517 struct ptlrpc_request *req;
3518 struct obd_device *obd = exp->exp_obd;
3519 struct obd_import *imp = class_exp2cliimp(exp);
3520 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3521 char *bufs[3] = { NULL, key, val };
3524 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3526 if (KEY_IS(KEY_NEXT_ID)) {
3527 if (vallen != sizeof(obd_id))
3529 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3530 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3531 exp->exp_obd->obd_name,
3532 obd->u.cli.cl_oscc.oscc_next_id);
3537 if (KEY_IS("unlinked")) {
3538 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3539 spin_lock(&oscc->oscc_lock);
3540 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3541 spin_unlock(&oscc->oscc_lock);
3545 if (KEY_IS(KEY_INIT_RECOV)) {
3546 if (vallen != sizeof(int))
3548 spin_lock(&imp->imp_lock);
3549 imp->imp_initial_recov = *(int *)val;
3550 spin_unlock(&imp->imp_lock);
3551 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3552 exp->exp_obd->obd_name,
3553 imp->imp_initial_recov);
3557 if (KEY_IS("checksum")) {
3558 if (vallen != sizeof(int))
3560 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3567 /* We pass all other commands directly to OST. Since nobody calls osc
3568 methods directly and everybody is supposed to go through LOV, we
3569 assume lov checked invalid values for us.
3570 The only recognised values so far are evict_by_nid and mds_conn.
3571 Even if something bad goes through, we'd get a -EINVAL from OST
3574 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3579 if (KEY_IS(KEY_MDS_CONN))
3580 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3582 ptlrpc_req_set_repsize(req, 1, NULL);
3583 ptlrpc_set_add_req(set, req);
3584 ptlrpc_check_set(set);
3590 static struct llog_operations osc_size_repl_logops = {
3591 lop_cancel: llog_obd_repl_cancel
3594 static struct llog_operations osc_mds_ost_orig_logops;
3595 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3596 int count, struct llog_catid *catid,
3597 struct obd_uuid *uuid)
3602 spin_lock(&obd->obd_dev_lock);
3603 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3604 osc_mds_ost_orig_logops = llog_lvfs_ops;
3605 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3606 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3607 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3608 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3610 spin_unlock(&obd->obd_dev_lock);
3612 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3613 &catid->lci_logid, &osc_mds_ost_orig_logops);
3615 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3619 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3620 &osc_size_repl_logops);
3622 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3625 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3626 obd->obd_name, tgt->obd_name, count, catid, rc);
3627 CERROR("logid "LPX64":0x%x\n",
3628 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3633 static int osc_llog_finish(struct obd_device *obd, int count)
3635 struct llog_ctxt *ctxt;
3636 int rc = 0, rc2 = 0;
3639 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3641 rc = llog_cleanup(ctxt);
3643 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3645 rc2 = llog_cleanup(ctxt);
3652 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3653 struct obd_uuid *cluuid,
3654 struct obd_connect_data *data)
3656 struct client_obd *cli = &obd->u.cli;
3658 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3661 client_obd_list_lock(&cli->cl_loi_list_lock);
3662 data->ocd_grant = cli->cl_avail_grant ?:
3663 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3664 lost_grant = cli->cl_lost_grant;
3665 cli->cl_lost_grant = 0;
3666 client_obd_list_unlock(&cli->cl_loi_list_lock);
3668 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3669 "cl_lost_grant: %ld\n", data->ocd_grant,
3670 cli->cl_avail_grant, lost_grant);
3671 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3672 " ocd_grant: %d\n", data->ocd_connect_flags,
3673 data->ocd_version, data->ocd_grant);
3679 static int osc_disconnect(struct obd_export *exp)
3681 struct obd_device *obd = class_exp2obd(exp);
3682 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3685 if (obd->u.cli.cl_conn_count == 1)
3686 /* flush any remaining cancel messages out to the target */
3687 llog_sync(ctxt, exp);
3689 llog_ctxt_put(ctxt);
3691 rc = client_disconnect_export(exp);
3695 static int osc_import_event(struct obd_device *obd,
3696 struct obd_import *imp,
3697 enum obd_import_event event)
3699 struct client_obd *cli;
3703 LASSERT(imp->imp_obd == obd);
3706 case IMP_EVENT_DISCON: {
3707 /* Only do this on the MDS OSC's */
3708 if (imp->imp_server_timeout) {
3709 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3711 spin_lock(&oscc->oscc_lock);
3712 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3713 spin_unlock(&oscc->oscc_lock);
3716 client_obd_list_lock(&cli->cl_loi_list_lock);
3717 cli->cl_avail_grant = 0;
3718 cli->cl_lost_grant = 0;
3719 client_obd_list_unlock(&cli->cl_loi_list_lock);
3720 ptlrpc_import_setasync(imp, -1);
3724 case IMP_EVENT_INACTIVE: {
3725 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3728 case IMP_EVENT_INVALIDATE: {
3729 struct ldlm_namespace *ns = obd->obd_namespace;
3733 client_obd_list_lock(&cli->cl_loi_list_lock);
3734 /* all pages go to failing rpcs due to the invalid import */
3735 osc_check_rpcs(cli);
3736 client_obd_list_unlock(&cli->cl_loi_list_lock);
3738 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3742 case IMP_EVENT_ACTIVE: {
3743 /* Only do this on the MDS OSC's */
3744 if (imp->imp_server_timeout) {
3745 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3747 spin_lock(&oscc->oscc_lock);
3748 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3749 spin_unlock(&oscc->oscc_lock);
3751 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3754 case IMP_EVENT_OCD: {
3755 struct obd_connect_data *ocd = &imp->imp_connect_data;
3757 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3758 osc_init_grant(&obd->u.cli, ocd);
3761 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3762 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3764 ptlrpc_import_setasync(imp, 1);
3765 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3769 CERROR("Unknown import event %d\n", event);
3775 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3781 rc = ptlrpcd_addref();
3785 rc = client_obd_setup(obd, len, buf);
3789 struct lprocfs_static_vars lvars = { 0 };
3790 struct client_obd *cli = &obd->u.cli;
3792 lprocfs_osc_init_vars(&lvars);
3793 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3794 lproc_osc_attach_seqstat(obd);
3795 ptlrpc_lprocfs_register_obd(obd);
3799 /* We need to allocate a few requests more, because
3800 brw_interpret_oap tries to create new requests before freeing
3801 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3802 reserved, but I afraid that might be too much wasted RAM
3803 in fact, so 2 is just my guess and still should work. */
3804 cli->cl_import->imp_rq_pool =
3805 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3807 ptlrpc_add_rqs_to_pool);
3808 cli->cl_cache = cache_create(obd);
3809 if (!cli->cl_cache) {
3818 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3824 case OBD_CLEANUP_EARLY: {
3825 struct obd_import *imp;
3826 imp = obd->u.cli.cl_import;
3827 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3828 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3829 ptlrpc_deactivate_import(imp);
3832 case OBD_CLEANUP_EXPORTS: {
3833 /* If we set up but never connected, the
3834 client import will not have been cleaned. */
3835 if (obd->u.cli.cl_import) {
3836 struct obd_import *imp;
3837 imp = obd->u.cli.cl_import;
3838 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3840 ptlrpc_invalidate_import(imp);
3841 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3842 class_destroy_import(imp);
3843 obd->u.cli.cl_import = NULL;
3847 case OBD_CLEANUP_SELF_EXP:
3848 rc = obd_llog_finish(obd, 0);
3850 CERROR("failed to cleanup llogging subsystems\n");
3852 case OBD_CLEANUP_OBD:
3858 int osc_cleanup(struct obd_device *obd)
3860 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3864 ptlrpc_lprocfs_unregister_obd(obd);
3865 lprocfs_obd_cleanup(obd);
3867 spin_lock(&oscc->oscc_lock);
3868 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3869 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3870 spin_unlock(&oscc->oscc_lock);
3872 /* free memory of osc quota cache */
3873 lquota_cleanup(quota_interface, obd);
3875 cache_destroy(obd->u.cli.cl_cache);
3876 rc = client_obd_cleanup(obd);
3882 static int osc_register_page_removal_cb(struct obd_export *exp,
3883 obd_page_removal_cb_t func,
3884 obd_pin_extent_cb pin_cb)
3886 return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
3890 static int osc_unregister_page_removal_cb(struct obd_export *exp,
3891 obd_page_removal_cb_t func)
3893 return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
3896 static int osc_register_lock_cancel_cb(struct obd_export *exp,
3897 obd_lock_cancel_cb cb)
3899 LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
3901 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
3905 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
3906 obd_lock_cancel_cb cb)
3908 if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
3909 CERROR("Unregistering cancel cb %p, while only %p was "
3911 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
3915 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
3919 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3921 struct lustre_cfg *lcfg = buf;
3922 struct lprocfs_static_vars lvars = { 0 };
3925 lprocfs_osc_init_vars(&lvars);
3927 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3931 struct obd_ops osc_obd_ops = {
3932 .o_owner = THIS_MODULE,
3933 .o_setup = osc_setup,
3934 .o_precleanup = osc_precleanup,
3935 .o_cleanup = osc_cleanup,
3936 .o_add_conn = client_import_add_conn,
3937 .o_del_conn = client_import_del_conn,
3938 .o_connect = client_connect_import,
3939 .o_reconnect = osc_reconnect,
3940 .o_disconnect = osc_disconnect,
3941 .o_statfs = osc_statfs,
3942 .o_statfs_async = osc_statfs_async,
3943 .o_packmd = osc_packmd,
3944 .o_unpackmd = osc_unpackmd,
3945 .o_precreate = osc_precreate,
3946 .o_create = osc_create,
3947 .o_destroy = osc_destroy,
3948 .o_getattr = osc_getattr,
3949 .o_getattr_async = osc_getattr_async,
3950 .o_setattr = osc_setattr,
3951 .o_setattr_async = osc_setattr_async,
3953 .o_brw_async = osc_brw_async,
3954 .o_prep_async_page = osc_prep_async_page,
3955 .o_reget_short_lock = osc_reget_short_lock,
3956 .o_release_short_lock = osc_release_short_lock,
3957 .o_queue_async_io = osc_queue_async_io,
3958 .o_set_async_flags = osc_set_async_flags,
3959 .o_queue_group_io = osc_queue_group_io,
3960 .o_trigger_group_io = osc_trigger_group_io,
3961 .o_teardown_async_page = osc_teardown_async_page,
3962 .o_punch = osc_punch,
3964 .o_enqueue = osc_enqueue,
3965 .o_match = osc_match,
3966 .o_change_cbdata = osc_change_cbdata,
3967 .o_cancel = osc_cancel,
3968 .o_cancel_unused = osc_cancel_unused,
3969 .o_join_lru = osc_join_lru,
3970 .o_iocontrol = osc_iocontrol,
3971 .o_get_info = osc_get_info,
3972 .o_set_info_async = osc_set_info_async,
3973 .o_import_event = osc_import_event,
3974 .o_llog_init = osc_llog_init,
3975 .o_llog_finish = osc_llog_finish,
3976 .o_process_config = osc_process_config,
3977 .o_register_page_removal_cb = osc_register_page_removal_cb,
3978 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
3979 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
3980 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
3982 int __init osc_init(void)
3984 struct lprocfs_static_vars lvars = { 0 };
3988 lprocfs_osc_init_vars(&lvars);
3990 request_module("lquota");
3991 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3992 lquota_init(quota_interface);
3993 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3995 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3998 if (quota_interface)
3999 PORTAL_SYMBOL_PUT(osc_quota_interface);
4007 static void /*__exit*/ osc_exit(void)
4009 lquota_exit(quota_interface);
4010 if (quota_interface)
4011 PORTAL_SYMBOL_PUT(osc_quota_interface);
4013 class_unregister_type(LUSTRE_OSC_NAME);
4016 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4017 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4018 MODULE_LICENSE("GPL");
4020 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);