1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
144 /* XXX zero *lsmp? */
145 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147 LASSERT((*lsmp)->lsm_object_id);
148 LASSERT((*lsmp)->lsm_object_gr);
151 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157 struct ost_body *body, void *capa)
159 struct obd_capa *oc = (struct obd_capa *)capa;
160 struct lustre_capa *c;
165 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
168 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169 DEBUG_CAPA(D_SEC, c, "pack");
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173 struct obd_info *oinfo)
175 struct ost_body *body;
177 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178 body->oa = *oinfo->oi_oa;
179 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183 struct osc_async_args *aa, int rc)
185 struct ost_body *body;
191 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192 lustre_swab_ost_body);
194 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
197 /* This should really be sent by the OST */
198 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
201 CERROR("can't unpack ost_body\n");
203 aa->aa_oi->oi_oa->o_valid = 0;
206 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211 struct ptlrpc_request_set *set)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216 struct osc_async_args *aa;
219 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221 OST_GETATTR, 3, size,NULL);
225 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
227 ptlrpc_req_set_repsize(req, 2, size);
228 req->rq_interpret_reply = osc_getattr_interpret;
230 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231 aa = (struct osc_async_args *)&req->rq_async_args;
234 ptlrpc_set_add_req(set, req);
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
240 struct ptlrpc_request *req;
241 struct ost_body *body;
242 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
245 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247 OST_GETATTR, 3, size, NULL);
251 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
253 ptlrpc_req_set_repsize(req, 2, size);
255 rc = ptlrpc_queue_wait(req);
257 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262 lustre_swab_ost_body);
264 CERROR ("can't unpack ost_body\n");
265 GOTO (out, rc = -EPROTO);
268 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269 *oinfo->oi_oa = body->oa;
271 /* This should really be sent by the OST */
272 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277 ptlrpc_req_finished(req);
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282 struct obd_trans_info *oti)
284 struct ptlrpc_request *req;
285 struct ost_body *body;
286 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
289 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290 oinfo->oi_oa->o_gr > 0);
291 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293 OST_SETATTR, 3, size, NULL);
297 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
299 ptlrpc_req_set_repsize(req, 2, size);
301 rc = ptlrpc_queue_wait(req);
305 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306 lustre_swab_ost_body);
308 GOTO(out, rc = -EPROTO);
310 *oinfo->oi_oa = body->oa;
314 ptlrpc_req_finished(req);
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319 struct osc_async_args *aa, int rc)
321 struct ost_body *body;
327 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328 lustre_swab_ost_body);
330 CERROR("can't unpack ost_body\n");
331 GOTO(out, rc = -EPROTO);
334 *aa->aa_oi->oi_oa = body->oa;
336 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341 struct obd_trans_info *oti,
342 struct ptlrpc_request_set *rqset)
344 struct ptlrpc_request *req;
345 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346 struct osc_async_args *aa;
349 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351 OST_SETATTR, 3, size, NULL);
355 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
358 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
361 ptlrpc_req_set_repsize(req, 2, size);
362 /* do mds to ost setattr asynchronouly */
364 /* Do not wait for response. */
365 ptlrpcd_add_req(req);
367 req->rq_interpret_reply = osc_setattr_interpret;
369 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370 aa = (struct osc_async_args *)&req->rq_async_args;
373 ptlrpc_set_add_req(rqset, req);
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380 struct lov_stripe_md **ea, struct obd_trans_info *oti)
382 struct ptlrpc_request *req;
383 struct ost_body *body;
384 struct lov_stripe_md *lsm;
385 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
393 rc = obd_alloc_memmd(exp, &lsm);
398 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399 OST_CREATE, 2, size, NULL);
401 GOTO(out, rc = -ENOMEM);
403 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
406 ptlrpc_req_set_repsize(req, 2, size);
407 if (oa->o_valid & OBD_MD_FLINLINE) {
408 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409 oa->o_flags == OBD_FL_DELORPHAN);
411 "delorphan from OST integration");
412 /* Don't resend the delorphan req */
413 req->rq_no_resend = req->rq_no_delay = 1;
416 rc = ptlrpc_queue_wait(req);
420 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421 lustre_swab_ost_body);
423 CERROR ("can't unpack ost_body\n");
424 GOTO (out_req, rc = -EPROTO);
429 /* This should really be sent by the OST */
430 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431 oa->o_valid |= OBD_MD_FLBLKSZ;
433 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434 * have valid lsm_oinfo data structs, so don't go touching that.
435 * This needs to be fixed in a big way.
437 lsm->lsm_object_id = oa->o_id;
438 lsm->lsm_object_gr = oa->o_gr;
442 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
444 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445 if (!oti->oti_logcookies)
446 oti_alloc_cookies(oti, 1);
447 *oti->oti_logcookies = *obdo_logcookie(oa);
451 CDEBUG(D_HA, "transno: "LPD64"\n",
452 lustre_msg_get_transno(req->rq_repmsg));
455 ptlrpc_req_finished(req);
458 obd_free_memmd(exp, &lsm);
462 static int osc_punch_interpret(struct ptlrpc_request *req,
463 struct osc_async_args *aa, int rc)
465 struct ost_body *body;
471 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
472 lustre_swab_ost_body);
474 CERROR ("can't unpack ost_body\n");
475 GOTO(out, rc = -EPROTO);
478 *aa->aa_oi->oi_oa = body->oa;
480 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
484 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
485 struct obd_trans_info *oti,
486 struct ptlrpc_request_set *rqset)
488 struct ptlrpc_request *req;
489 struct osc_async_args *aa;
490 struct ost_body *body;
491 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
499 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
500 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
501 OST_PUNCH, 3, size, NULL);
505 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
507 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
508 /* overload the size and blocks fields in the oa with start/end */
509 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
510 body->oa.o_size = oinfo->oi_policy.l_extent.start;
511 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
512 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
514 ptlrpc_req_set_repsize(req, 2, size);
516 req->rq_interpret_reply = osc_punch_interpret;
517 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
518 aa = (struct osc_async_args *)&req->rq_async_args;
520 ptlrpc_set_add_req(rqset, req);
525 static int osc_sync(struct obd_export *exp, struct obdo *oa,
526 struct lov_stripe_md *md, obd_size start, obd_size end,
529 struct ptlrpc_request *req;
530 struct ost_body *body;
531 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
539 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
541 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
542 OST_SYNC, 3, size, NULL);
546 /* overload the size and blocks fields in the oa with start/end */
547 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
549 body->oa.o_size = start;
550 body->oa.o_blocks = end;
551 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
553 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
555 ptlrpc_req_set_repsize(req, 2, size);
557 rc = ptlrpc_queue_wait(req);
561 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
562 lustre_swab_ost_body);
564 CERROR ("can't unpack ost_body\n");
565 GOTO (out, rc = -EPROTO);
572 ptlrpc_req_finished(req);
576 /* Find and cancel locally locks matched by @mode in the resource found by
577 * @objid. Found locks are added into @cancel list. Returns the amount of
578 * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580 struct list_head *cancels, ldlm_mode_t mode,
583 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
585 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
592 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
593 lock_flags, 0, NULL);
594 ldlm_resource_putref(res);
598 /* Destroy requests can be async always on the client, and we don't even really
599 * care about the return code since the client cannot do anything at all about
601 * When the MDS is unlinking a filename, it saves the file objects into a
602 * recovery llog, and these object records are cancelled when the OST reports
603 * they were destroyed and sync'd to disk (i.e. transaction committed).
604 * If the client dies, or the OST is down when the object should be destroyed,
605 * the records are not cancelled, and when the OST reconnects to the MDS next,
606 * it will retrieve the llog unlink logs and then sends the log cancellation
607 * cookies to the MDS after committing destroy transactions. */
608 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
609 struct lov_stripe_md *ea, struct obd_trans_info *oti,
610 struct obd_export *md_export)
612 CFS_LIST_HEAD(cancels);
613 struct ptlrpc_request *req;
614 struct ost_body *body;
615 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
616 int count, bufcount = 2;
624 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
625 LDLM_FL_DISCARD_DATA);
626 if (exp_connect_cancelset(exp) && count) {
628 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
630 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
631 OST_DESTROY, bufcount, size, NULL);
632 if (exp_connect_cancelset(exp) && req)
633 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
635 ldlm_lock_list_put(&cancels, l_bl_ast, count);
640 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
642 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
644 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
645 sizeof(*oti->oti_logcookies));
648 ptlrpc_req_set_repsize(req, 2, size);
650 ptlrpcd_add_req(req);
654 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
657 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
659 LASSERT(!(oa->o_valid & bits));
662 client_obd_list_lock(&cli->cl_loi_list_lock);
663 oa->o_dirty = cli->cl_dirty;
664 if (cli->cl_dirty > cli->cl_dirty_max) {
665 CERROR("dirty %lu > dirty_max %lu\n",
666 cli->cl_dirty, cli->cl_dirty_max);
668 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
669 CERROR("dirty %d > system dirty_max %d\n",
670 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
672 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
673 CERROR("dirty %lu - dirty_max %lu too big???\n",
674 cli->cl_dirty, cli->cl_dirty_max);
677 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
678 (cli->cl_max_rpcs_in_flight + 1);
679 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
681 oa->o_grant = cli->cl_avail_grant;
682 oa->o_dropped = cli->cl_lost_grant;
683 cli->cl_lost_grant = 0;
684 client_obd_list_unlock(&cli->cl_loi_list_lock);
685 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
686 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
689 /* caller must hold loi_list_lock */
690 static void osc_consume_write_grant(struct client_obd *cli,
691 struct brw_page *pga)
693 atomic_inc(&obd_dirty_pages);
694 cli->cl_dirty += CFS_PAGE_SIZE;
695 cli->cl_avail_grant -= CFS_PAGE_SIZE;
696 pga->flag |= OBD_BRW_FROM_GRANT;
697 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
698 CFS_PAGE_SIZE, pga, pga->pg);
699 LASSERT(cli->cl_avail_grant >= 0);
702 /* the companion to osc_consume_write_grant, called when a brw has completed.
703 * must be called with the loi lock held. */
704 static void osc_release_write_grant(struct client_obd *cli,
705 struct brw_page *pga, int sent)
707 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
710 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
715 pga->flag &= ~OBD_BRW_FROM_GRANT;
716 atomic_dec(&obd_dirty_pages);
717 cli->cl_dirty -= CFS_PAGE_SIZE;
719 cli->cl_lost_grant += CFS_PAGE_SIZE;
720 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
721 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
722 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
723 /* For short writes we shouldn't count parts of pages that
724 * span a whole block on the OST side, or our accounting goes
725 * wrong. Should match the code in filter_grant_check. */
726 int offset = pga->off & ~CFS_PAGE_MASK;
727 int count = pga->count + (offset & (blocksize - 1));
728 int end = (offset + pga->count) & (blocksize - 1);
730 count += blocksize - end;
732 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
733 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
734 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
735 cli->cl_avail_grant, cli->cl_dirty);
741 static unsigned long rpcs_in_flight(struct client_obd *cli)
743 return cli->cl_r_in_flight + cli->cl_w_in_flight;
746 /* caller must hold loi_list_lock */
747 void osc_wake_cache_waiters(struct client_obd *cli)
749 struct list_head *l, *tmp;
750 struct osc_cache_waiter *ocw;
753 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
754 /* if we can't dirty more, we must wait until some is written */
755 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
756 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
757 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
758 "osc max %ld, sys max %d\n", cli->cl_dirty,
759 cli->cl_dirty_max, obd_max_dirty_pages);
763 /* if still dirty cache but no grant wait for pending RPCs that
764 * may yet return us some grant before doing sync writes */
765 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
766 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
767 cli->cl_w_in_flight);
771 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
772 list_del_init(&ocw->ocw_entry);
773 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
774 /* no more RPCs in flight to return grant, do sync IO */
775 ocw->ocw_rc = -EDQUOT;
776 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
778 osc_consume_write_grant(cli,
779 &ocw->ocw_oap->oap_brw_page);
782 cfs_waitq_signal(&ocw->ocw_waitq);
788 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
790 client_obd_list_lock(&cli->cl_loi_list_lock);
791 cli->cl_avail_grant = ocd->ocd_grant;
792 client_obd_list_unlock(&cli->cl_loi_list_lock);
794 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
795 cli->cl_avail_grant, cli->cl_lost_grant);
796 LASSERT(cli->cl_avail_grant >= 0);
799 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
801 client_obd_list_lock(&cli->cl_loi_list_lock);
802 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
803 cli->cl_avail_grant += body->oa.o_grant;
804 /* waiters are woken in brw_interpret_oap */
805 client_obd_list_unlock(&cli->cl_loi_list_lock);
808 /* We assume that the reason this OSC got a short read is because it read
809 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
810 * via the LOV, and it _knows_ it's reading inside the file, it's just that
811 * this stripe never got written at or beyond this stripe offset yet. */
812 static void handle_short_read(int nob_read, obd_count page_count,
813 struct brw_page **pga)
818 /* skip bytes read OK */
819 while (nob_read > 0) {
820 LASSERT (page_count > 0);
822 if (pga[i]->count > nob_read) {
823 /* EOF inside this page */
824 ptr = cfs_kmap(pga[i]->pg) +
825 (pga[i]->off & ~CFS_PAGE_MASK);
826 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
827 cfs_kunmap(pga[i]->pg);
833 nob_read -= pga[i]->count;
838 /* zero remaining pages */
839 while (page_count-- > 0) {
840 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
841 memset(ptr, 0, pga[i]->count);
842 cfs_kunmap(pga[i]->pg);
847 static int check_write_rcs(struct ptlrpc_request *req,
848 int requested_nob, int niocount,
849 obd_count page_count, struct brw_page **pga)
853 /* return error if any niobuf was in error */
854 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
855 sizeof(*remote_rcs) * niocount, NULL);
856 if (remote_rcs == NULL) {
857 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
860 if (lustre_msg_swabbed(req->rq_repmsg))
861 for (i = 0; i < niocount; i++)
862 __swab32s(&remote_rcs[i]);
864 for (i = 0; i < niocount; i++) {
865 if (remote_rcs[i] < 0)
866 return(remote_rcs[i]);
868 if (remote_rcs[i] != 0) {
869 CERROR("rc[%d] invalid (%d) req %p\n",
870 i, remote_rcs[i], req);
875 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
876 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
877 requested_nob, req->rq_bulk->bd_nob_transferred);
884 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
886 if (p1->flag != p2->flag) {
887 unsigned mask = ~OBD_BRW_FROM_GRANT;
889 /* warn if we try to combine flags that we don't know to be
891 if ((p1->flag & mask) != (p2->flag & mask))
892 CERROR("is it ok to have flags 0x%x and 0x%x in the "
893 "same brw?\n", p1->flag, p2->flag);
897 return (p1->off + p1->count == p2->off);
900 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
901 struct brw_page **pga)
906 LASSERT (pg_count > 0);
907 while (nob > 0 && pg_count > 0) {
908 char *ptr = cfs_kmap(pga[i]->pg);
909 int off = pga[i]->off & ~CFS_PAGE_MASK;
910 int count = pga[i]->count > nob ? nob : pga[i]->count;
912 /* corrupt the data before we compute the checksum, to
913 * simulate an OST->client data error */
915 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
916 memcpy(ptr + off, "bad1", min(4, nob));
917 cksum = crc32_le(cksum, ptr + off, count);
918 cfs_kunmap(pga[i]->pg);
919 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
922 nob -= pga[i]->count;
926 /* For sending we only compute the wrong checksum instead
927 * of corrupting the data so it is still correct on a redo */
928 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
934 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
935 struct lov_stripe_md *lsm, obd_count page_count,
936 struct brw_page **pga,
937 struct ptlrpc_request **reqp,
938 struct obd_capa *ocapa)
940 struct ptlrpc_request *req;
941 struct ptlrpc_bulk_desc *desc;
942 struct ost_body *body;
943 struct obd_ioobj *ioobj;
944 struct niobuf_remote *niobuf;
945 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
946 int niocount, i, requested_nob, opc, rc;
947 struct ptlrpc_request_pool *pool;
948 struct lustre_capa *capa;
949 struct osc_brw_async_args *aa;
952 if ((cmd & OBD_BRW_WRITE) != 0) {
954 pool = cli->cl_import->imp_rq_pool;
960 for (niocount = i = 1; i < page_count; i++) {
961 if (!can_merge_pages(pga[i - 1], pga[i]))
965 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
966 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
968 size[REQ_REC_OFF + 3] = sizeof(*capa);
970 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
971 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
972 size, NULL, pool, NULL);
976 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
978 if (opc == OST_WRITE)
979 desc = ptlrpc_prep_bulk_imp (req, page_count,
980 BULK_GET_SOURCE, OST_BULK_PORTAL);
982 desc = ptlrpc_prep_bulk_imp (req, page_count,
983 BULK_PUT_SINK, OST_BULK_PORTAL);
985 GOTO(out, rc = -ENOMEM);
986 /* NB request now owns desc and will free it when it gets freed */
988 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
989 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
990 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
991 niocount * sizeof(*niobuf));
995 obdo_to_ioobj(oa, ioobj);
996 ioobj->ioo_bufcnt = niocount;
998 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1000 capa_cpy(capa, ocapa);
1001 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1004 LASSERT (page_count > 0);
1005 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1006 struct brw_page *pg = pga[i];
1007 struct brw_page *pg_prev = pga[i - 1];
1009 LASSERT(pg->count > 0);
1010 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1011 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1012 pg->off, pg->count);
1014 LASSERTF(i == 0 || pg->off > pg_prev->off,
1015 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1016 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1018 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1019 pg_prev->pg, page_private(pg_prev->pg),
1020 pg_prev->pg->index, pg_prev->off);
1022 LASSERTF(i == 0 || pg->off > pg_prev->off,
1023 "i %d p_c %u\n", i, page_count);
1025 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1026 (pg->flag & OBD_BRW_SRVLOCK));
1028 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1030 requested_nob += pg->count;
1032 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1034 niobuf->len += pg->count;
1036 niobuf->offset = pg->off;
1037 niobuf->len = pg->count;
1038 niobuf->flags = pg->flag;
1042 LASSERT((void *)(niobuf - niocount) ==
1043 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1044 niocount * sizeof(*niobuf)));
1045 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1047 /* size[REQ_REC_OFF] still sizeof (*body) */
1048 if (opc == OST_WRITE) {
1049 if (unlikely(cli->cl_checksum)) {
1050 body->oa.o_valid |= OBD_MD_FLCKSUM;
1051 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1053 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1055 /* save this in 'oa', too, for later checking */
1056 oa->o_valid |= OBD_MD_FLCKSUM;
1058 /* clear out the checksum flag, in case this is a
1059 * resend but cl_checksum is no longer set. b=11238 */
1060 oa->o_valid &= ~OBD_MD_FLCKSUM;
1062 oa->o_cksum = body->oa.o_cksum;
1063 /* 1 RC per niobuf */
1064 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1065 ptlrpc_req_set_repsize(req, 3, size);
1067 if (unlikely(cli->cl_checksum))
1068 body->oa.o_valid |= OBD_MD_FLCKSUM;
1069 /* 1 RC for the whole I/O */
1070 ptlrpc_req_set_repsize(req, 2, size);
1073 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1074 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1076 aa->aa_requested_nob = requested_nob;
1077 aa->aa_nio_count = niocount;
1078 aa->aa_page_count = page_count;
1079 aa->aa_retries = 5; /*retry for checksum errors; lprocfs? */
1082 INIT_LIST_HEAD(&aa->aa_oaps);
1088 ptlrpc_req_finished (req);
1092 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1093 __u32 client_cksum, __u32 server_cksum,
1094 int nob, obd_count page_count,
1095 struct brw_page **pga)
1100 if (server_cksum == client_cksum) {
1101 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1105 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1107 if (new_cksum == server_cksum)
1108 msg = "changed on the client after we checksummed it - "
1109 "likely false positive due to mmap IO (bug 11742)";
1110 else if (new_cksum == client_cksum)
1111 msg = "changed in transit before arrival at OST";
1113 msg = "changed in transit AND doesn't match the original - "
1114 "likely false positive due to mmap IO (bug 11742)";
1116 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1117 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1118 "["LPU64"-"LPU64"]\n",
1119 msg, libcfs_nid2str(peer->nid),
1120 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1121 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1124 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1126 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1127 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1128 client_cksum, server_cksum, new_cksum);
1132 /* Note rc enters this function as number of bytes transferred */
1133 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1135 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1136 const lnet_process_id_t *peer =
1137 &req->rq_import->imp_connection->c_peer;
1138 struct client_obd *cli = aa->aa_cli;
1139 struct ost_body *body;
1140 __u32 client_cksum = 0;
1143 if (rc < 0 && rc != -EDQUOT)
1146 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1147 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1148 lustre_swab_ost_body);
1150 CERROR ("Can't unpack body\n");
1154 /* set/clear over quota flag for a uid/gid */
1155 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1156 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1157 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1158 body->oa.o_gid, body->oa.o_valid,
1164 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1165 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1167 osc_update_grant(cli, body);
1169 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1171 CERROR ("Unexpected +ve rc %d\n", rc);
1174 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1176 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1178 check_write_checksum(&body->oa, peer, client_cksum,
1180 aa->aa_requested_nob,
1185 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
1187 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1188 aa->aa_page_count, aa->aa_ppga);
1192 /* The rest of this function executes only for OST_READs */
1193 if (rc > aa->aa_requested_nob) {
1194 CERROR("Unexpected rc %d (%d requested)\n", rc,
1195 aa->aa_requested_nob);
1199 if (rc != req->rq_bulk->bd_nob_transferred) {
1200 CERROR ("Unexpected rc %d (%d transferred)\n",
1201 rc, req->rq_bulk->bd_nob_transferred);
1205 if (rc < aa->aa_requested_nob)
1206 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1208 sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count, aa->aa_ppga);
1210 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1211 static int cksum_counter;
1212 __u32 server_cksum = body->oa.o_cksum;
1216 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1219 if (peer->nid == req->rq_bulk->bd_sender) {
1223 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1226 if (server_cksum == ~0 && rc > 0) {
1227 CERROR("Protocol error: server %s set the 'checksum' "
1228 "bit, but didn't send a checksum. Not fatal, "
1229 "but please tell CFS.\n",
1230 libcfs_nid2str(peer->nid));
1231 } else if (server_cksum != client_cksum) {
1232 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1233 "%s%s%s inum "LPU64"/"LPU64" object "
1234 LPU64"/"LPU64" extent "
1235 "["LPU64"-"LPU64"]\n",
1236 req->rq_import->imp_obd->obd_name,
1237 libcfs_nid2str(peer->nid),
1239 body->oa.o_valid & OBD_MD_FLFID ?
1240 body->oa.o_fid : (__u64)0,
1241 body->oa.o_valid & OBD_MD_FLFID ?
1242 body->oa.o_generation :(__u64)0,
1244 body->oa.o_valid & OBD_MD_FLGROUP ?
1245 body->oa.o_gr : (__u64)0,
1246 aa->aa_ppga[0]->off,
1247 aa->aa_ppga[aa->aa_page_count-1]->off +
1248 aa->aa_ppga[aa->aa_page_count-1]->count -
1250 CERROR("client %x, server %x\n",
1251 client_cksum, server_cksum);
1253 aa->aa_oa->o_cksum = client_cksum;
1257 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1260 } else if (unlikely(client_cksum)) {
1261 static int cksum_missed;
1264 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1265 CERROR("Checksum %u requested from %s but not sent\n",
1266 cksum_missed, libcfs_nid2str(peer->nid));
1272 *aa->aa_oa = body->oa;
1277 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1278 struct lov_stripe_md *lsm,
1279 obd_count page_count, struct brw_page **pga,
1280 struct obd_capa *ocapa)
1282 struct ptlrpc_request *req;
1283 int rc, retries = 5; /* lprocfs? */
1287 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1288 page_count, pga, &req, ocapa);
1292 rc = ptlrpc_queue_wait(req);
1294 if (rc == -ETIMEDOUT && req->rq_resend) {
1295 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1296 ptlrpc_req_finished(req);
1300 rc = osc_brw_fini_request(req, rc);
1302 ptlrpc_req_finished(req);
1303 if (rc == -EAGAIN) {
1311 int osc_brw_redo_request(struct ptlrpc_request *req,
1312 struct osc_brw_async_args *aa)
1314 struct ptlrpc_request *new_req;
1315 struct ptlrpc_request_set *set = req->rq_set;
1316 struct osc_brw_async_args *new_aa;
1317 struct osc_async_page *oap;
1321 if (aa->aa_retries-- <= 0) {
1322 CERROR("too many checksum retries, returning error\n");
1326 DEBUG_REQ(D_ERROR, req, "redo for checksum error");
1327 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1328 if (oap->oap_request != NULL) {
1329 LASSERTF(req == oap->oap_request,
1330 "request %p != oap_request %p\n",
1331 req, oap->oap_request);
1332 if (oap->oap_interrupted) {
1333 ptlrpc_mark_interrupted(oap->oap_request);
1341 /* TODO-MERGE: and where to get ocapa?? */
1342 rc = osc_brw_prep_request(lustre_msg_get_opc(req->rq_reqmsg) ==
1343 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1344 aa->aa_cli, aa->aa_oa,
1345 NULL /* lsm unused by osc currently */,
1346 aa->aa_page_count, aa->aa_ppga, &new_req,
1351 /* New request takes over pga and oaps from old request.
1352 * Note that copying a list_head doesn't work, need to move it... */
1353 new_req->rq_interpret_reply = req->rq_interpret_reply;
1354 new_req->rq_async_args = req->rq_async_args;
1355 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1356 INIT_LIST_HEAD(&new_aa->aa_oaps);
1357 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1358 INIT_LIST_HEAD(&aa->aa_oaps);
1360 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1361 if (oap->oap_request) {
1362 ptlrpc_req_finished(oap->oap_request);
1363 oap->oap_request = ptlrpc_request_addref(new_req);
1367 ptlrpc_set_add_req(set, new_req);
1372 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1374 struct osc_brw_async_args *aa = data;
1379 rc = osc_brw_fini_request(req, rc);
1380 if (rc == -EAGAIN) {
1381 rc = osc_brw_redo_request(req, aa);
1385 if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1386 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1388 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1389 for (i = 0; i < aa->aa_page_count; i++)
1390 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1391 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1393 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1398 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1399 struct lov_stripe_md *lsm, obd_count page_count,
1400 struct brw_page **pga, struct ptlrpc_request_set *set,
1401 struct obd_capa *ocapa)
1403 struct ptlrpc_request *req;
1404 struct client_obd *cli = &exp->exp_obd->u.cli;
1408 /* Consume write credits even if doing a sync write -
1409 * otherwise we may run out of space on OST due to grant. */
1410 if (cmd == OBD_BRW_WRITE) {
1411 spin_lock(&cli->cl_loi_list_lock);
1412 for (i = 0; i < page_count; i++) {
1413 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1414 osc_consume_write_grant(cli, pga[i]);
1416 spin_unlock(&cli->cl_loi_list_lock);
1419 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1422 req->rq_interpret_reply = brw_interpret;
1423 ptlrpc_set_add_req(set, req);
1424 } else if (cmd == OBD_BRW_WRITE) {
1425 spin_lock(&cli->cl_loi_list_lock);
1426 for (i = 0; i < page_count; i++)
1427 osc_release_write_grant(cli, pga[i], 0);
1428 spin_unlock(&cli->cl_loi_list_lock);
1434 * ugh, we want disk allocation on the target to happen in offset order. we'll
1435 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1436 * fine for our small page arrays and doesn't require allocation. its an
1437 * insertion sort that swaps elements that are strides apart, shrinking the
1438 * stride down until its '1' and the array is sorted.
1440 static void sort_brw_pages(struct brw_page **array, int num)
1443 struct brw_page *tmp;
1447 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1452 for (i = stride ; i < num ; i++) {
1455 while (j >= stride && array[j - stride]->off > tmp->off) {
1456 array[j] = array[j - stride];
1461 } while (stride > 1);
1464 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1470 LASSERT (pages > 0);
1471 offset = pg[i]->off & ~CFS_PAGE_MASK;
1475 if (pages == 0) /* that's all */
1478 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1479 return count; /* doesn't end on page boundary */
1482 offset = pg[i]->off & ~CFS_PAGE_MASK;
1483 if (offset != 0) /* doesn't start on page boundary */
1490 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1492 struct brw_page **ppga;
1495 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1499 for (i = 0; i < count; i++)
1504 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1506 LASSERT(ppga != NULL);
1507 OBD_FREE(ppga, sizeof(*ppga) * count);
1510 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1511 obd_count page_count, struct brw_page *pga,
1512 struct obd_trans_info *oti)
1514 struct obdo *saved_oa = NULL;
1515 struct brw_page **ppga, **orig;
1516 struct obd_import *imp = class_exp2cliimp(exp);
1517 struct client_obd *cli = &imp->imp_obd->u.cli;
1518 int rc, page_count_orig;
1521 if (cmd & OBD_BRW_CHECK) {
1522 /* The caller just wants to know if there's a chance that this
1523 * I/O can succeed */
1525 if (imp == NULL || imp->imp_invalid)
1530 /* test_brw with a failed create can trip this, maybe others. */
1531 LASSERT(cli->cl_max_pages_per_rpc);
1535 orig = ppga = osc_build_ppga(pga, page_count);
1538 page_count_orig = page_count;
1540 sort_brw_pages(ppga, page_count);
1541 while (page_count) {
1542 obd_count pages_per_brw;
1544 if (page_count > cli->cl_max_pages_per_rpc)
1545 pages_per_brw = cli->cl_max_pages_per_rpc;
1547 pages_per_brw = page_count;
1549 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1551 if (saved_oa != NULL) {
1552 /* restore previously saved oa */
1553 *oinfo->oi_oa = *saved_oa;
1554 } else if (page_count > pages_per_brw) {
1555 /* save a copy of oa (brw will clobber it) */
1556 OBDO_ALLOC(saved_oa);
1557 if (saved_oa == NULL)
1558 GOTO(out, rc = -ENOMEM);
1559 *saved_oa = *oinfo->oi_oa;
1562 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1563 pages_per_brw, ppga, oinfo->oi_capa);
1568 page_count -= pages_per_brw;
1569 ppga += pages_per_brw;
1573 osc_release_ppga(orig, page_count_orig);
1575 if (saved_oa != NULL)
1576 OBDO_FREE(saved_oa);
1581 static int osc_brw_async(int cmd, struct obd_export *exp,
1582 struct obd_info *oinfo, obd_count page_count,
1583 struct brw_page *pga, struct obd_trans_info *oti,
1584 struct ptlrpc_request_set *set)
1586 struct brw_page **ppga, **orig;
1587 struct client_obd *cli = &exp->exp_obd->u.cli;
1588 int page_count_orig;
1592 if (cmd & OBD_BRW_CHECK) {
1593 struct obd_import *imp = class_exp2cliimp(exp);
1594 /* The caller just wants to know if there's a chance that this
1595 * I/O can succeed */
1597 if (imp == NULL || imp->imp_invalid)
1602 orig = ppga = osc_build_ppga(pga, page_count);
1605 page_count_orig = page_count;
1607 sort_brw_pages(ppga, page_count);
1608 while (page_count) {
1609 struct brw_page **copy;
1610 obd_count pages_per_brw;
1612 pages_per_brw = min_t(obd_count, page_count,
1613 cli->cl_max_pages_per_rpc);
1615 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1617 /* use ppga only if single RPC is going to fly */
1618 if (pages_per_brw != page_count_orig || ppga != orig) {
1619 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1621 GOTO(out, rc = -ENOMEM);
1622 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1626 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1627 pages_per_brw, copy, set, oinfo->oi_capa);
1631 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1635 /* we passed it to async_internal() which is
1636 * now responsible for releasing memory */
1640 page_count -= pages_per_brw;
1641 ppga += pages_per_brw;
1645 osc_release_ppga(orig, page_count_orig);
1649 static void osc_check_rpcs(struct client_obd *cli);
1651 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1652 * the dirty accounting. Writeback completes or truncate happens before
1653 * writing starts. Must be called with the loi lock held. */
1654 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1657 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1661 /* This maintains the lists of pending pages to read/write for a given object
1662 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1663 * to quickly find objects that are ready to send an RPC. */
1664 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1670 if (lop->lop_num_pending == 0)
1673 /* if we have an invalid import we want to drain the queued pages
1674 * by forcing them through rpcs that immediately fail and complete
1675 * the pages. recovery relies on this to empty the queued pages
1676 * before canceling the locks and evicting down the llite pages */
1677 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1680 /* stream rpcs in queue order as long as as there is an urgent page
1681 * queued. this is our cheap solution for good batching in the case
1682 * where writepage marks some random page in the middle of the file
1683 * as urgent because of, say, memory pressure */
1684 if (!list_empty(&lop->lop_urgent)) {
1685 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1688 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1689 optimal = cli->cl_max_pages_per_rpc;
1690 if (cmd & OBD_BRW_WRITE) {
1691 /* trigger a write rpc stream as long as there are dirtiers
1692 * waiting for space. as they're waiting, they're not going to
1693 * create more pages to coallesce with what's waiting.. */
1694 if (!list_empty(&cli->cl_cache_waiters)) {
1695 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1698 /* +16 to avoid triggering rpcs that would want to include pages
1699 * that are being queued but which can't be made ready until
1700 * the queuer finishes with the page. this is a wart for
1701 * llite::commit_write() */
1704 if (lop->lop_num_pending >= optimal)
1710 static void on_list(struct list_head *item, struct list_head *list,
1713 if (list_empty(item) && should_be_on)
1714 list_add_tail(item, list);
1715 else if (!list_empty(item) && !should_be_on)
1716 list_del_init(item);
1719 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1720 * can find pages to build into rpcs quickly */
1721 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1723 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1724 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1725 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1727 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1728 loi->loi_write_lop.lop_num_pending);
1730 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1731 loi->loi_read_lop.lop_num_pending);
1734 static void lop_update_pending(struct client_obd *cli,
1735 struct loi_oap_pages *lop, int cmd, int delta)
1737 lop->lop_num_pending += delta;
1738 if (cmd & OBD_BRW_WRITE)
1739 cli->cl_pending_w_pages += delta;
1741 cli->cl_pending_r_pages += delta;
1744 /* this is called when a sync waiter receives an interruption. Its job is to
1745 * get the caller woken as soon as possible. If its page hasn't been put in an
1746 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1747 * desiring interruption which will forcefully complete the rpc once the rpc
1749 static void osc_occ_interrupted(struct oig_callback_context *occ)
1751 struct osc_async_page *oap;
1752 struct loi_oap_pages *lop;
1753 struct lov_oinfo *loi;
1756 /* XXX member_of() */
1757 oap = list_entry(occ, struct osc_async_page, oap_occ);
1759 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1761 oap->oap_interrupted = 1;
1763 /* ok, it's been put in an rpc. only one oap gets a request reference */
1764 if (oap->oap_request != NULL) {
1765 ptlrpc_mark_interrupted(oap->oap_request);
1766 ptlrpcd_wake(oap->oap_request);
1770 /* we don't get interruption callbacks until osc_trigger_group_io()
1771 * has been called and put the sync oaps in the pending/urgent lists.*/
1772 if (!list_empty(&oap->oap_pending_item)) {
1773 list_del_init(&oap->oap_pending_item);
1774 list_del_init(&oap->oap_urgent_item);
1777 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1778 &loi->loi_write_lop : &loi->loi_read_lop;
1779 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1780 loi_list_maint(oap->oap_cli, oap->oap_loi);
1782 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1783 oap->oap_oig = NULL;
1787 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1790 /* this is trying to propogate async writeback errors back up to the
1791 * application. As an async write fails we record the error code for later if
1792 * the app does an fsync. As long as errors persist we force future rpcs to be
1793 * sync so that the app can get a sync error and break the cycle of queueing
1794 * pages for which writeback will fail. */
1795 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1802 ar->ar_force_sync = 1;
1803 ar->ar_min_xid = ptlrpc_sample_next_xid();
1808 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1809 ar->ar_force_sync = 0;
1812 static void osc_oap_to_pending(struct osc_async_page *oap)
1814 struct loi_oap_pages *lop;
1816 if (oap->oap_cmd & OBD_BRW_WRITE)
1817 lop = &oap->oap_loi->loi_write_lop;
1819 lop = &oap->oap_loi->loi_read_lop;
1821 if (oap->oap_async_flags & ASYNC_URGENT)
1822 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1823 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1824 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1827 /* this must be called holding the loi list lock to give coverage to exit_cache,
1828 * async_flag maintenance, and oap_request */
1829 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1830 struct osc_async_page *oap, int sent, int rc)
1833 oap->oap_async_flags = 0;
1834 oap->oap_interrupted = 0;
1836 if (oap->oap_cmd & OBD_BRW_WRITE) {
1837 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1838 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1841 if (oap->oap_request != NULL) {
1842 ptlrpc_req_finished(oap->oap_request);
1843 oap->oap_request = NULL;
1846 if (rc == 0 && oa != NULL) {
1847 if (oa->o_valid & OBD_MD_FLBLOCKS)
1848 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1849 if (oa->o_valid & OBD_MD_FLMTIME)
1850 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1851 if (oa->o_valid & OBD_MD_FLATIME)
1852 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1853 if (oa->o_valid & OBD_MD_FLCTIME)
1854 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1858 osc_exit_cache(cli, oap, sent);
1859 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1860 oap->oap_oig = NULL;
1865 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1866 oap->oap_cmd, oa, rc);
1868 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1869 * I/O on the page could start, but OSC calls it under lock
1870 * and thus we can add oap back to pending safely */
1872 /* upper layer wants to leave the page on pending queue */
1873 osc_oap_to_pending(oap);
1875 osc_exit_cache(cli, oap, sent);
1879 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1881 struct osc_async_page *oap, *tmp;
1882 struct osc_brw_async_args *aa = data;
1883 struct client_obd *cli;
1886 rc = osc_brw_fini_request(req, rc);
1887 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1888 if (rc == -EAGAIN) {
1889 rc = osc_brw_redo_request(req, aa);
1897 client_obd_list_lock(&cli->cl_loi_list_lock);
1899 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1900 * is called so we know whether to go to sync BRWs or wait for more
1901 * RPCs to complete */
1902 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1903 cli->cl_w_in_flight--;
1905 cli->cl_r_in_flight--;
1907 /* the caller may re-use the oap after the completion call so
1908 * we need to clean it up a little */
1909 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1910 list_del_init(&oap->oap_rpc_item);
1911 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1914 osc_wake_cache_waiters(cli);
1915 osc_check_rpcs(cli);
1917 client_obd_list_unlock(&cli->cl_loi_list_lock);
1919 OBDO_FREE(aa->aa_oa);
1922 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1926 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1927 struct list_head *rpc_list,
1928 int page_count, int cmd)
1930 struct ptlrpc_request *req;
1931 struct brw_page **pga = NULL;
1932 struct osc_brw_async_args *aa;
1933 struct obdo *oa = NULL;
1934 struct obd_async_page_ops *ops = NULL;
1935 void *caller_data = NULL;
1936 struct obd_capa *ocapa;
1937 struct osc_async_page *oap;
1941 LASSERT(!list_empty(rpc_list));
1943 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1945 RETURN(ERR_PTR(-ENOMEM));
1949 GOTO(out, req = ERR_PTR(-ENOMEM));
1952 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1954 ops = oap->oap_caller_ops;
1955 caller_data = oap->oap_caller_data;
1957 pga[i] = &oap->oap_brw_page;
1958 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1959 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1960 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1964 /* always get the data for the obdo for the rpc */
1965 LASSERT(ops != NULL);
1966 ops->ap_fill_obdo(caller_data, cmd, oa);
1967 ocapa = ops->ap_lookup_capa(caller_data, cmd);
1969 sort_brw_pages(pga, page_count);
1970 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1974 CERROR("prep_req failed: %d\n", rc);
1975 GOTO(out, req = ERR_PTR(rc));
1978 /* Need to update the timestamps after the request is built in case
1979 * we race with setattr (locally or in queue at OST). If OST gets
1980 * later setattr before earlier BRW (as determined by the request xid),
1981 * the OST will not use BRW timestamps. Sadly, there is no obvious
1982 * way to do this in a single call. bug 10150 */
1983 ops->ap_update_obdo(caller_data, cmd, oa,
1984 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1986 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1987 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1988 INIT_LIST_HEAD(&aa->aa_oaps);
1989 list_splice(rpc_list, &aa->aa_oaps);
1990 INIT_LIST_HEAD(rpc_list);
1997 OBD_FREE(pga, sizeof(*pga) * page_count);
2002 /* the loi lock is held across this function but it's allowed to release
2003 * and reacquire it during its work */
2004 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2005 int cmd, struct loi_oap_pages *lop)
2007 struct ptlrpc_request *req;
2008 obd_count page_count = 0;
2009 struct osc_async_page *oap = NULL, *tmp;
2010 struct osc_brw_async_args *aa;
2011 struct obd_async_page_ops *ops;
2012 CFS_LIST_HEAD(rpc_list);
2013 unsigned int ending_offset;
2014 unsigned starting_offset = 0;
2017 /* first we find the pages we're allowed to work with */
2018 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2020 ops = oap->oap_caller_ops;
2022 LASSERT(oap->oap_magic == OAP_MAGIC);
2024 /* in llite being 'ready' equates to the page being locked
2025 * until completion unlocks it. commit_write submits a page
2026 * as not ready because its unlock will happen unconditionally
2027 * as the call returns. if we race with commit_write giving
2028 * us that page we dont' want to create a hole in the page
2029 * stream, so we stop and leave the rpc to be fired by
2030 * another dirtier or kupdated interval (the not ready page
2031 * will still be on the dirty list). we could call in
2032 * at the end of ll_file_write to process the queue again. */
2033 if (!(oap->oap_async_flags & ASYNC_READY)) {
2034 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2036 CDEBUG(D_INODE, "oap %p page %p returned %d "
2037 "instead of ready\n", oap,
2041 /* llite is telling us that the page is still
2042 * in commit_write and that we should try
2043 * and put it in an rpc again later. we
2044 * break out of the loop so we don't create
2045 * a hole in the sequence of pages in the rpc
2050 /* the io isn't needed.. tell the checks
2051 * below to complete the rpc with EINTR */
2052 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2053 oap->oap_count = -EINTR;
2056 oap->oap_async_flags |= ASYNC_READY;
2059 LASSERTF(0, "oap %p page %p returned %d "
2060 "from make_ready\n", oap,
2068 * Page submitted for IO has to be locked. Either by
2069 * ->ap_make_ready() or by higher layers.
2071 * XXX nikita: this assertion should be adjusted when lustre
2072 * starts using PG_writeback for pages being written out.
2074 #if defined(__KERNEL__) && defined(__LINUX__)
2075 LASSERT(PageLocked(oap->oap_page));
2077 /* If there is a gap at the start of this page, it can't merge
2078 * with any previous page, so we'll hand the network a
2079 * "fragmented" page array that it can't transfer in 1 RDMA */
2080 if (page_count != 0 && oap->oap_page_off != 0)
2083 /* take the page out of our book-keeping */
2084 list_del_init(&oap->oap_pending_item);
2085 lop_update_pending(cli, lop, cmd, -1);
2086 list_del_init(&oap->oap_urgent_item);
2088 if (page_count == 0)
2089 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2090 (PTLRPC_MAX_BRW_SIZE - 1);
2092 /* ask the caller for the size of the io as the rpc leaves. */
2093 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2095 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2096 if (oap->oap_count <= 0) {
2097 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2099 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2103 /* now put the page back in our accounting */
2104 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2105 if (++page_count >= cli->cl_max_pages_per_rpc)
2108 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2109 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2110 * have the same alignment as the initial writes that allocated
2111 * extents on the server. */
2112 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2113 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2114 if (ending_offset == 0)
2117 /* If there is a gap at the end of this page, it can't merge
2118 * with any subsequent pages, so we'll hand the network a
2119 * "fragmented" page array that it can't transfer in 1 RDMA */
2120 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2124 osc_wake_cache_waiters(cli);
2126 if (page_count == 0)
2129 loi_list_maint(cli, loi);
2131 client_obd_list_unlock(&cli->cl_loi_list_lock);
2133 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2135 /* this should happen rarely and is pretty bad, it makes the
2136 * pending list not follow the dirty order */
2137 client_obd_list_lock(&cli->cl_loi_list_lock);
2138 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2139 list_del_init(&oap->oap_rpc_item);
2141 /* queued sync pages can be torn down while the pages
2142 * were between the pending list and the rpc */
2143 if (oap->oap_interrupted) {
2144 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2145 osc_ap_completion(cli, NULL, oap, 0,
2149 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2151 loi_list_maint(cli, loi);
2152 RETURN(PTR_ERR(req));
2155 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2157 if (cmd == OBD_BRW_READ) {
2158 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2159 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2160 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2161 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2162 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2164 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2165 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2166 cli->cl_w_in_flight);
2167 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2168 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2169 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2172 client_obd_list_lock(&cli->cl_loi_list_lock);
2174 if (cmd == OBD_BRW_READ)
2175 cli->cl_r_in_flight++;
2177 cli->cl_w_in_flight++;
2179 /* queued sync pages can be torn down while the pages
2180 * were between the pending list and the rpc */
2182 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2183 /* only one oap gets a request reference */
2186 if (oap->oap_interrupted && !req->rq_intr) {
2187 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2189 ptlrpc_mark_interrupted(req);
2193 tmp->oap_request = ptlrpc_request_addref(req);
2195 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2196 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2198 req->rq_interpret_reply = brw_interpret_oap;
2199 ptlrpcd_add_req(req);
2203 #define LOI_DEBUG(LOI, STR, args...) \
2204 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2205 !list_empty(&(LOI)->loi_cli_item), \
2206 (LOI)->loi_write_lop.lop_num_pending, \
2207 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2208 (LOI)->loi_read_lop.lop_num_pending, \
2209 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2212 /* This is called by osc_check_rpcs() to find which objects have pages that
2213 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2214 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2217 /* first return all objects which we already know to have
2218 * pages ready to be stuffed into rpcs */
2219 if (!list_empty(&cli->cl_loi_ready_list))
2220 RETURN(list_entry(cli->cl_loi_ready_list.next,
2221 struct lov_oinfo, loi_cli_item));
2223 /* then if we have cache waiters, return all objects with queued
2224 * writes. This is especially important when many small files
2225 * have filled up the cache and not been fired into rpcs because
2226 * they don't pass the nr_pending/object threshhold */
2227 if (!list_empty(&cli->cl_cache_waiters) &&
2228 !list_empty(&cli->cl_loi_write_list))
2229 RETURN(list_entry(cli->cl_loi_write_list.next,
2230 struct lov_oinfo, loi_write_item));
2232 /* then return all queued objects when we have an invalid import
2233 * so that they get flushed */
2234 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2235 if (!list_empty(&cli->cl_loi_write_list))
2236 RETURN(list_entry(cli->cl_loi_write_list.next,
2237 struct lov_oinfo, loi_write_item));
2238 if (!list_empty(&cli->cl_loi_read_list))
2239 RETURN(list_entry(cli->cl_loi_read_list.next,
2240 struct lov_oinfo, loi_read_item));
2245 /* called with the loi list lock held */
2246 static void osc_check_rpcs(struct client_obd *cli)
2248 struct lov_oinfo *loi;
2249 int rc = 0, race_counter = 0;
2252 while ((loi = osc_next_loi(cli)) != NULL) {
2253 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2255 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2258 /* attempt some read/write balancing by alternating between
2259 * reads and writes in an object. The makes_rpc checks here
2260 * would be redundant if we were getting read/write work items
2261 * instead of objects. we don't want send_oap_rpc to drain a
2262 * partial read pending queue when we're given this object to
2263 * do io on writes while there are cache waiters */
2264 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2265 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2266 &loi->loi_write_lop);
2274 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2275 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2276 &loi->loi_read_lop);
2285 /* attempt some inter-object balancing by issueing rpcs
2286 * for each object in turn */
2287 if (!list_empty(&loi->loi_cli_item))
2288 list_del_init(&loi->loi_cli_item);
2289 if (!list_empty(&loi->loi_write_item))
2290 list_del_init(&loi->loi_write_item);
2291 if (!list_empty(&loi->loi_read_item))
2292 list_del_init(&loi->loi_read_item);
2294 loi_list_maint(cli, loi);
2296 /* send_oap_rpc fails with 0 when make_ready tells it to
2297 * back off. llite's make_ready does this when it tries
2298 * to lock a page queued for write that is already locked.
2299 * we want to try sending rpcs from many objects, but we
2300 * don't want to spin failing with 0. */
2301 if (race_counter == 10)
2307 /* we're trying to queue a page in the osc so we're subject to the
2308 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2309 * If the osc's queued pages are already at that limit, then we want to sleep
2310 * until there is space in the osc's queue for us. We also may be waiting for
2311 * write credits from the OST if there are RPCs in flight that may return some
2312 * before we fall back to sync writes.
2314 * We need this know our allocation was granted in the presence of signals */
2315 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2319 client_obd_list_lock(&cli->cl_loi_list_lock);
2320 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2321 client_obd_list_unlock(&cli->cl_loi_list_lock);
2325 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2326 * grant or cache space. */
2327 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2328 struct osc_async_page *oap)
2330 struct osc_cache_waiter ocw;
2331 struct l_wait_info lwi = { 0 };
2335 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2336 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2337 cli->cl_dirty_max, obd_max_dirty_pages,
2338 cli->cl_lost_grant, cli->cl_avail_grant);
2340 /* force the caller to try sync io. this can jump the list
2341 * of queued writes and create a discontiguous rpc stream */
2342 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2343 loi->loi_ar.ar_force_sync)
2346 /* Hopefully normal case - cache space and write credits available */
2347 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2348 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2349 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2350 /* account for ourselves */
2351 osc_consume_write_grant(cli, &oap->oap_brw_page);
2355 /* Make sure that there are write rpcs in flight to wait for. This
2356 * is a little silly as this object may not have any pending but
2357 * other objects sure might. */
2358 if (cli->cl_w_in_flight) {
2359 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2360 cfs_waitq_init(&ocw.ocw_waitq);
2364 loi_list_maint(cli, loi);
2365 osc_check_rpcs(cli);
2366 client_obd_list_unlock(&cli->cl_loi_list_lock);
2368 CDEBUG(D_CACHE, "sleeping for cache space\n");
2369 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2371 client_obd_list_lock(&cli->cl_loi_list_lock);
2372 if (!list_empty(&ocw.ocw_entry)) {
2373 list_del(&ocw.ocw_entry);
2382 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2383 struct lov_oinfo *loi, cfs_page_t *page,
2384 obd_off offset, struct obd_async_page_ops *ops,
2385 void *data, void **res)
2387 struct osc_async_page *oap;
2391 return size_round(sizeof(*oap));
2394 oap->oap_magic = OAP_MAGIC;
2395 oap->oap_cli = &exp->exp_obd->u.cli;
2398 oap->oap_caller_ops = ops;
2399 oap->oap_caller_data = data;
2401 oap->oap_page = page;
2402 oap->oap_obj_off = offset;
2404 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2405 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2406 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2408 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2410 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2414 struct osc_async_page *oap_from_cookie(void *cookie)
2416 struct osc_async_page *oap = cookie;
2417 if (oap->oap_magic != OAP_MAGIC)
2418 return ERR_PTR(-EINVAL);
2422 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2423 struct lov_oinfo *loi, void *cookie,
2424 int cmd, obd_off off, int count,
2425 obd_flag brw_flags, enum async_flags async_flags)
2427 struct client_obd *cli = &exp->exp_obd->u.cli;
2428 struct osc_async_page *oap;
2432 oap = oap_from_cookie(cookie);
2434 RETURN(PTR_ERR(oap));
2436 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2439 if (!list_empty(&oap->oap_pending_item) ||
2440 !list_empty(&oap->oap_urgent_item) ||
2441 !list_empty(&oap->oap_rpc_item))
2444 /* check if the file's owner/group is over quota */
2445 #ifdef HAVE_QUOTA_SUPPORT
2446 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2447 struct obd_async_page_ops *ops;
2454 ops = oap->oap_caller_ops;
2455 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2456 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2467 loi = lsm->lsm_oinfo[0];
2469 client_obd_list_lock(&cli->cl_loi_list_lock);
2472 oap->oap_page_off = off;
2473 oap->oap_count = count;
2474 oap->oap_brw_flags = brw_flags;
2475 oap->oap_async_flags = async_flags;
2477 if (cmd & OBD_BRW_WRITE) {
2478 rc = osc_enter_cache(cli, loi, oap);
2480 client_obd_list_unlock(&cli->cl_loi_list_lock);
2485 osc_oap_to_pending(oap);
2486 loi_list_maint(cli, loi);
2488 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2491 osc_check_rpcs(cli);
2492 client_obd_list_unlock(&cli->cl_loi_list_lock);
2497 /* aka (~was & now & flag), but this is more clear :) */
2498 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2500 static int osc_set_async_flags(struct obd_export *exp,
2501 struct lov_stripe_md *lsm,
2502 struct lov_oinfo *loi, void *cookie,
2503 obd_flag async_flags)
2505 struct client_obd *cli = &exp->exp_obd->u.cli;
2506 struct loi_oap_pages *lop;
2507 struct osc_async_page *oap;
2511 oap = oap_from_cookie(cookie);
2513 RETURN(PTR_ERR(oap));
2516 * bug 7311: OST-side locking is only supported for liblustre for now
2517 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2518 * implementation has to handle case where OST-locked page was picked
2519 * up by, e.g., ->writepage().
2521 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2522 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2525 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2529 loi = lsm->lsm_oinfo[0];
2531 if (oap->oap_cmd & OBD_BRW_WRITE) {
2532 lop = &loi->loi_write_lop;
2534 lop = &loi->loi_read_lop;
2537 client_obd_list_lock(&cli->cl_loi_list_lock);
2539 if (list_empty(&oap->oap_pending_item))
2540 GOTO(out, rc = -EINVAL);
2542 if ((oap->oap_async_flags & async_flags) == async_flags)
2545 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2546 oap->oap_async_flags |= ASYNC_READY;
2548 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2549 if (list_empty(&oap->oap_rpc_item)) {
2550 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2551 loi_list_maint(cli, loi);
2555 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2556 oap->oap_async_flags);
2558 osc_check_rpcs(cli);
2559 client_obd_list_unlock(&cli->cl_loi_list_lock);
2563 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2564 struct lov_oinfo *loi,
2565 struct obd_io_group *oig, void *cookie,
2566 int cmd, obd_off off, int count,
2568 obd_flag async_flags)
2570 struct client_obd *cli = &exp->exp_obd->u.cli;
2571 struct osc_async_page *oap;
2572 struct loi_oap_pages *lop;
2576 oap = oap_from_cookie(cookie);
2578 RETURN(PTR_ERR(oap));
2580 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2583 if (!list_empty(&oap->oap_pending_item) ||
2584 !list_empty(&oap->oap_urgent_item) ||
2585 !list_empty(&oap->oap_rpc_item))
2589 loi = lsm->lsm_oinfo[0];
2591 client_obd_list_lock(&cli->cl_loi_list_lock);
2594 oap->oap_page_off = off;
2595 oap->oap_count = count;
2596 oap->oap_brw_flags = brw_flags;
2597 oap->oap_async_flags = async_flags;
2599 if (cmd & OBD_BRW_WRITE)
2600 lop = &loi->loi_write_lop;
2602 lop = &loi->loi_read_lop;
2604 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2605 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2607 rc = oig_add_one(oig, &oap->oap_occ);
2610 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2611 oap, oap->oap_page, rc);
2613 client_obd_list_unlock(&cli->cl_loi_list_lock);
2618 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2619 struct loi_oap_pages *lop, int cmd)
2621 struct list_head *pos, *tmp;
2622 struct osc_async_page *oap;
2624 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2625 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2626 list_del(&oap->oap_pending_item);
2627 osc_oap_to_pending(oap);
2629 loi_list_maint(cli, loi);
2632 static int osc_trigger_group_io(struct obd_export *exp,
2633 struct lov_stripe_md *lsm,
2634 struct lov_oinfo *loi,
2635 struct obd_io_group *oig)
2637 struct client_obd *cli = &exp->exp_obd->u.cli;
2641 loi = lsm->lsm_oinfo[0];
2643 client_obd_list_lock(&cli->cl_loi_list_lock);
2645 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2646 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2648 osc_check_rpcs(cli);
2649 client_obd_list_unlock(&cli->cl_loi_list_lock);
2654 static int osc_teardown_async_page(struct obd_export *exp,
2655 struct lov_stripe_md *lsm,
2656 struct lov_oinfo *loi, void *cookie)
2658 struct client_obd *cli = &exp->exp_obd->u.cli;
2659 struct loi_oap_pages *lop;
2660 struct osc_async_page *oap;
2664 oap = oap_from_cookie(cookie);
2666 RETURN(PTR_ERR(oap));
2669 loi = lsm->lsm_oinfo[0];
2671 if (oap->oap_cmd & OBD_BRW_WRITE) {
2672 lop = &loi->loi_write_lop;
2674 lop = &loi->loi_read_lop;
2677 client_obd_list_lock(&cli->cl_loi_list_lock);
2679 if (!list_empty(&oap->oap_rpc_item))
2680 GOTO(out, rc = -EBUSY);
2682 osc_exit_cache(cli, oap, 0);
2683 osc_wake_cache_waiters(cli);
2685 if (!list_empty(&oap->oap_urgent_item)) {
2686 list_del_init(&oap->oap_urgent_item);
2687 oap->oap_async_flags &= ~ASYNC_URGENT;
2689 if (!list_empty(&oap->oap_pending_item)) {
2690 list_del_init(&oap->oap_pending_item);
2691 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2693 loi_list_maint(cli, loi);
2695 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2697 client_obd_list_unlock(&cli->cl_loi_list_lock);
2701 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2704 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2707 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2710 lock_res_and_lock(lock);
2713 /* Liang XXX: Darwin and Winnt checking should be added */
2714 if (lock->l_ast_data && lock->l_ast_data != data) {
2715 struct inode *new_inode = data;
2716 struct inode *old_inode = lock->l_ast_data;
2717 if (!(old_inode->i_state & I_FREEING))
2718 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2719 LASSERTF(old_inode->i_state & I_FREEING,
2720 "Found existing inode %p/%lu/%u state %lu in lock: "
2721 "setting data to %p/%lu/%u\n", old_inode,
2722 old_inode->i_ino, old_inode->i_generation,
2724 new_inode, new_inode->i_ino, new_inode->i_generation);
2728 lock->l_ast_data = data;
2729 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2730 unlock_res_and_lock(lock);
2731 LDLM_LOCK_PUT(lock);
2734 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2735 ldlm_iterator_t replace, void *data)
2737 struct ldlm_res_id res_id = { .name = {0} };
2738 struct obd_device *obd = class_exp2obd(exp);
2740 res_id.name[0] = lsm->lsm_object_id;
2741 res_id.name[2] = lsm->lsm_object_gr;
2743 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2747 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2753 /* The request was created before ldlm_cli_enqueue call. */
2754 if (rc == ELDLM_LOCK_ABORTED) {
2755 struct ldlm_reply *rep;
2757 /* swabbed by ldlm_cli_enqueue() */
2758 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2759 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2761 LASSERT(rep != NULL);
2762 if (rep->lock_policy_res1)
2763 rc = rep->lock_policy_res1;
2767 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2768 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2769 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2770 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2771 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2774 /* Call the update callback. */
2775 rc = oinfo->oi_cb_up(oinfo, rc);
2779 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2780 struct osc_enqueue_args *aa, int rc)
2782 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2783 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2784 struct ldlm_lock *lock;
2786 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2788 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2790 /* Complete obtaining the lock procedure. */
2791 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2793 &aa->oa_ei->ei_flags,
2794 &lsm->lsm_oinfo[0]->loi_lvb,
2795 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2796 lustre_swab_ost_lvb,
2797 aa->oa_oi->oi_lockh, rc);
2799 /* Complete osc stuff. */
2800 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2802 /* Release the lock for async request. */
2803 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2804 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2806 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2807 aa->oa_oi->oi_lockh, req, aa);
2808 LDLM_LOCK_PUT(lock);
2812 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2813 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2814 * other synchronous requests, however keeping some locks and trying to obtain
2815 * others may take a considerable amount of time in a case of ost failure; and
2816 * when other sync requests do not get released lock from a client, the client
2817 * is excluded from the cluster -- such scenarious make the life difficult, so
2818 * release locks just after they are obtained. */
2819 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2820 struct obd_enqueue_info *einfo)
2822 struct ldlm_res_id res_id = { .name = {0} };
2823 struct obd_device *obd = exp->exp_obd;
2824 struct ldlm_reply *rep;
2825 struct ptlrpc_request *req = NULL;
2826 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2830 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2831 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2833 /* Filesystem lock extents are extended to page boundaries so that
2834 * dealing with the page cache is a little smoother. */
2835 oinfo->oi_policy.l_extent.start -=
2836 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2837 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2839 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2842 /* Next, search for already existing extent locks that will cover us */
2843 rc = ldlm_lock_match(obd->obd_namespace,
2844 einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2845 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2848 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2851 /* I would like to be able to ASSERT here that rss <=
2852 * kms, but I can't, for reasons which are explained in
2856 /* We already have a lock, and it's referenced */
2857 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2859 /* For async requests, decref the lock. */
2860 if (einfo->ei_rqset)
2861 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2866 /* If we're trying to read, we also search for an existing PW lock. The
2867 * VFS and page cache already protect us locally, so lots of readers/
2868 * writers can share a single PW lock.
2870 * There are problems with conversion deadlocks, so instead of
2871 * converting a read lock to a write lock, we'll just enqueue a new
2874 * At some point we should cancel the read lock instead of making them
2875 * send us a blocking callback, but there are problems with canceling
2876 * locks out from other users right now, too. */
2878 if (einfo->ei_mode == LCK_PR) {
2879 rc = ldlm_lock_match(obd->obd_namespace,
2880 einfo->ei_flags | LDLM_FL_LVB_READY,
2881 &res_id, einfo->ei_type, &oinfo->oi_policy,
2882 LCK_PW, oinfo->oi_lockh);
2884 /* FIXME: This is not incredibly elegant, but it might
2885 * be more elegant than adding another parameter to
2886 * lock_match. I want a second opinion. */
2887 /* addref the lock only if not async requests. */
2888 if (!einfo->ei_rqset)
2889 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2890 osc_set_data_with_check(oinfo->oi_lockh,
2893 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2894 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2902 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2903 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2904 [DLM_LOCKREQ_OFF + 1] = 0 };
2906 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2910 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2911 size[DLM_REPLY_REC_OFF] =
2912 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2913 ptlrpc_req_set_repsize(req, 3, size);
2916 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2917 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2919 rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type,
2920 &oinfo->oi_policy, einfo->ei_mode,
2921 &einfo->ei_flags, einfo->ei_cb_bl,
2922 einfo->ei_cb_cp, einfo->ei_cb_gl,
2924 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2925 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2926 lustre_swab_ost_lvb, oinfo->oi_lockh,
2927 einfo->ei_rqset ? 1 : 0);
2928 if (einfo->ei_rqset) {
2930 struct osc_enqueue_args *aa;
2931 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2932 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2937 req->rq_interpret_reply = osc_enqueue_interpret;
2938 ptlrpc_set_add_req(einfo->ei_rqset, req);
2939 } else if (intent) {
2940 ptlrpc_req_finished(req);
2945 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2947 ptlrpc_req_finished(req);
2952 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2953 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2954 int *flags, void *data, struct lustre_handle *lockh)
2956 struct ldlm_res_id res_id = { .name = {0} };
2957 struct obd_device *obd = exp->exp_obd;
2959 int lflags = *flags;
2962 res_id.name[0] = lsm->lsm_object_id;
2963 res_id.name[2] = lsm->lsm_object_gr;
2965 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2967 /* Filesystem lock extents are extended to page boundaries so that
2968 * dealing with the page cache is a little smoother */
2969 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2970 policy->l_extent.end |= ~CFS_PAGE_MASK;
2972 /* Next, search for already existing extent locks that will cover us */
2973 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2974 &res_id, type, policy, mode, lockh);
2976 //if (!(*flags & LDLM_FL_TEST_LOCK))
2977 osc_set_data_with_check(lockh, data, lflags);
2980 /* If we're trying to read, we also search for an existing PW lock. The
2981 * VFS and page cache already protect us locally, so lots of readers/
2982 * writers can share a single PW lock. */
2983 if (mode == LCK_PR) {
2984 rc = ldlm_lock_match(obd->obd_namespace,
2985 lflags | LDLM_FL_LVB_READY, &res_id,
2986 type, policy, LCK_PW, lockh);
2987 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2988 /* FIXME: This is not incredibly elegant, but it might
2989 * be more elegant than adding another parameter to
2990 * lock_match. I want a second opinion. */
2991 osc_set_data_with_check(lockh, data, lflags);
2992 ldlm_lock_addref(lockh, LCK_PR);
2993 ldlm_lock_decref(lockh, LCK_PW);
2999 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3000 __u32 mode, struct lustre_handle *lockh)
3004 if (unlikely(mode == LCK_GROUP))
3005 ldlm_lock_decref_and_cancel(lockh, mode);
3007 ldlm_lock_decref(lockh, mode);
3012 static int osc_cancel_unused(struct obd_export *exp,
3013 struct lov_stripe_md *lsm, int flags,
3016 struct obd_device *obd = class_exp2obd(exp);
3017 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3020 res_id.name[0] = lsm->lsm_object_id;
3021 res_id.name[2] = lsm->lsm_object_gr;
3025 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3028 static int osc_join_lru(struct obd_export *exp,
3029 struct lov_stripe_md *lsm, int join)
3031 struct obd_device *obd = class_exp2obd(exp);
3032 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3035 res_id.name[0] = lsm->lsm_object_id;
3036 res_id.name[2] = lsm->lsm_object_gr;
3040 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3043 static int osc_statfs_interpret(struct ptlrpc_request *req,
3044 struct osc_async_args *aa, int rc)
3046 struct obd_statfs *msfs;
3052 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3053 lustre_swab_obd_statfs);
3055 CERROR("Can't unpack obd_statfs\n");
3056 GOTO(out, rc = -EPROTO);
3059 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3061 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3065 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3066 __u64 max_age, struct ptlrpc_request_set *rqset)
3068 struct ptlrpc_request *req;
3069 struct osc_async_args *aa;
3070 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3073 /* We could possibly pass max_age in the request (as an absolute
3074 * timestamp or a "seconds.usec ago") so the target can avoid doing
3075 * extra calls into the filesystem if that isn't necessary (e.g.
3076 * during mount that would help a bit). Having relative timestamps
3077 * is not so great if request processing is slow, while absolute
3078 * timestamps are not ideal because they need time synchronization. */
3079 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3080 OST_STATFS, 1, NULL, NULL);
3084 ptlrpc_req_set_repsize(req, 2, size);
3085 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3087 req->rq_interpret_reply = osc_statfs_interpret;
3088 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3089 aa = (struct osc_async_args *)&req->rq_async_args;
3092 ptlrpc_set_add_req(rqset, req);
3096 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3099 struct obd_statfs *msfs;
3100 struct ptlrpc_request *req;
3101 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3104 /* We could possibly pass max_age in the request (as an absolute
3105 * timestamp or a "seconds.usec ago") so the target can avoid doing
3106 * extra calls into the filesystem if that isn't necessary (e.g.
3107 * during mount that would help a bit). Having relative timestamps
3108 * is not so great if request processing is slow, while absolute
3109 * timestamps are not ideal because they need time synchronization. */
3110 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3111 OST_STATFS, 1, NULL, NULL);
3115 ptlrpc_req_set_repsize(req, 2, size);
3116 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3118 rc = ptlrpc_queue_wait(req);
3122 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3123 lustre_swab_obd_statfs);
3125 CERROR("Can't unpack obd_statfs\n");
3126 GOTO(out, rc = -EPROTO);
3129 memcpy(osfs, msfs, sizeof(*osfs));
3133 ptlrpc_req_finished(req);
3137 /* Retrieve object striping information.
3139 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3140 * the maximum number of OST indices which will fit in the user buffer.
3141 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3143 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3145 struct lov_user_md lum, *lumk;
3146 int rc = 0, lum_size;
3152 if (copy_from_user(&lum, lump, sizeof(lum)))
3155 if (lum.lmm_magic != LOV_USER_MAGIC)
3158 if (lum.lmm_stripe_count > 0) {
3159 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3160 OBD_ALLOC(lumk, lum_size);
3164 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3165 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3167 lum_size = sizeof(lum);
3171 lumk->lmm_object_id = lsm->lsm_object_id;
3172 lumk->lmm_object_gr = lsm->lsm_object_gr;
3173 lumk->lmm_stripe_count = 1;
3175 if (copy_to_user(lump, lumk, lum_size))
3179 OBD_FREE(lumk, lum_size);
3185 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3186 void *karg, void *uarg)
3188 struct obd_device *obd = exp->exp_obd;
3189 struct obd_ioctl_data *data = karg;
3193 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3196 if (!try_module_get(THIS_MODULE)) {
3197 CERROR("Can't get module. Is it alive?");
3202 case OBD_IOC_LOV_GET_CONFIG: {
3204 struct lov_desc *desc;
3205 struct obd_uuid uuid;
3209 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3210 GOTO(out, err = -EINVAL);
3212 data = (struct obd_ioctl_data *)buf;
3214 if (sizeof(*desc) > data->ioc_inllen1) {
3215 obd_ioctl_freedata(buf, len);
3216 GOTO(out, err = -EINVAL);
3219 if (data->ioc_inllen2 < sizeof(uuid)) {
3220 obd_ioctl_freedata(buf, len);
3221 GOTO(out, err = -EINVAL);
3224 desc = (struct lov_desc *)data->ioc_inlbuf1;
3225 desc->ld_tgt_count = 1;
3226 desc->ld_active_tgt_count = 1;
3227 desc->ld_default_stripe_count = 1;
3228 desc->ld_default_stripe_size = 0;
3229 desc->ld_default_stripe_offset = 0;
3230 desc->ld_pattern = 0;
3231 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3233 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3235 err = copy_to_user((void *)uarg, buf, len);
3238 obd_ioctl_freedata(buf, len);
3241 case LL_IOC_LOV_SETSTRIPE:
3242 err = obd_alloc_memmd(exp, karg);
3246 case LL_IOC_LOV_GETSTRIPE:
3247 err = osc_getstripe(karg, uarg);
3249 case OBD_IOC_CLIENT_RECOVER:
3250 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3255 case IOC_OSC_SET_ACTIVE:
3256 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3259 case OBD_IOC_POLL_QUOTACHECK:
3260 err = lquota_poll_check(quota_interface, exp,
3261 (struct if_quotacheck *)karg);
3264 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3265 cmd, cfs_curproc_comm());
3266 GOTO(out, err = -ENOTTY);
3269 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3272 module_put(THIS_MODULE);
3277 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3278 void *key, __u32 *vallen, void *val)
3281 if (!vallen || !val)
3284 if (keylen > strlen("lock_to_stripe") &&
3285 strcmp(key, "lock_to_stripe") == 0) {
3286 __u32 *stripe = val;
3287 *vallen = sizeof(*stripe);
3290 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3291 struct ptlrpc_request *req;
3293 char *bufs[2] = { NULL, key };
3294 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3296 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3297 OST_GET_INFO, 2, size, bufs);
3301 size[REPLY_REC_OFF] = *vallen;
3302 ptlrpc_req_set_repsize(req, 2, size);
3303 rc = ptlrpc_queue_wait(req);
3307 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3308 lustre_swab_ost_last_id);
3309 if (reply == NULL) {
3310 CERROR("Can't unpack OST last ID\n");
3311 GOTO(out, rc = -EPROTO);
3313 *((obd_id *)val) = *reply;
3315 ptlrpc_req_finished(req);
3321 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3324 struct llog_ctxt *ctxt;
3325 struct obd_import *imp = req->rq_import;
3331 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3334 rc = llog_initiator_connect(ctxt);
3336 CERROR("cannot establish connection for "
3337 "ctxt %p: %d\n", ctxt, rc);
3340 spin_lock(&imp->imp_lock);
3341 imp->imp_server_timeout = 1;
3342 imp->imp_pingable = 1;
3343 spin_unlock(&imp->imp_lock);
3344 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3349 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3350 void *key, obd_count vallen, void *val,
3351 struct ptlrpc_request_set *set)
3353 struct ptlrpc_request *req;
3354 struct obd_device *obd = exp->exp_obd;
3355 struct obd_import *imp = class_exp2cliimp(exp);
3356 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3357 char *bufs[3] = { NULL, key, val };
3360 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3362 if (KEY_IS(KEY_NEXT_ID)) {
3363 if (vallen != sizeof(obd_id))
3365 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3366 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3367 exp->exp_obd->obd_name,
3368 obd->u.cli.cl_oscc.oscc_next_id);
3373 if (KEY_IS("unlinked")) {
3374 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3375 spin_lock(&oscc->oscc_lock);
3376 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3377 spin_unlock(&oscc->oscc_lock);
3381 if (KEY_IS(KEY_INIT_RECOV)) {
3382 if (vallen != sizeof(int))
3384 spin_lock(&imp->imp_lock);
3385 imp->imp_initial_recov = *(int *)val;
3386 spin_unlock(&imp->imp_lock);
3387 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3388 exp->exp_obd->obd_name,
3389 imp->imp_initial_recov);
3393 if (KEY_IS("checksum")) {
3394 if (vallen != sizeof(int))
3396 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3400 if (KEY_IS(KEY_FLUSH_CTX)) {
3401 sptlrpc_import_flush_my_ctx(imp);
3408 /* We pass all other commands directly to OST. Since nobody calls osc
3409 methods directly and everybody is supposed to go through LOV, we
3410 assume lov checked invalid values for us.
3411 The only recognised values so far are evict_by_nid and mds_conn.
3412 Even if something bad goes through, we'd get a -EINVAL from OST
3415 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3420 if (KEY_IS("mds_conn")) {
3421 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3423 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3424 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3425 LASSERT(oscc->oscc_oa.o_gr > 0);
3426 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3429 ptlrpc_req_set_repsize(req, 1, NULL);
3430 ptlrpc_set_add_req(set, req);
3431 ptlrpc_check_set(set);
3437 static struct llog_operations osc_size_repl_logops = {
3438 lop_cancel: llog_obd_repl_cancel
3441 static struct llog_operations osc_mds_ost_orig_logops;
3442 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3443 struct obd_device *tgt, int count,
3444 struct llog_catid *catid, struct obd_uuid *uuid)
3449 spin_lock(&obd->obd_dev_lock);
3450 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3451 osc_mds_ost_orig_logops = llog_lvfs_ops;
3452 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3453 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3454 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3455 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3457 spin_unlock(&obd->obd_dev_lock);
3459 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3460 &catid->lci_logid, &osc_mds_ost_orig_logops);
3462 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3466 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3467 &osc_size_repl_logops);
3469 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3472 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3473 obd->obd_name, tgt->obd_name, count, catid, rc);
3474 CERROR("logid "LPX64":0x%x\n",
3475 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3480 static int osc_llog_finish(struct obd_device *obd, int count)
3482 struct llog_ctxt *ctxt;
3483 int rc = 0, rc2 = 0;
3486 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3488 rc = llog_cleanup(ctxt);
3490 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3492 rc2 = llog_cleanup(ctxt);
3499 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3500 struct obd_uuid *cluuid,
3501 struct obd_connect_data *data)
3503 struct client_obd *cli = &obd->u.cli;
3505 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3508 client_obd_list_lock(&cli->cl_loi_list_lock);
3509 data->ocd_grant = cli->cl_avail_grant ?:
3510 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3511 lost_grant = cli->cl_lost_grant;
3512 cli->cl_lost_grant = 0;
3513 client_obd_list_unlock(&cli->cl_loi_list_lock);
3515 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3516 "cl_lost_grant: %ld\n", data->ocd_grant,
3517 cli->cl_avail_grant, lost_grant);
3518 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3519 " ocd_grant: %d\n", data->ocd_connect_flags,
3520 data->ocd_version, data->ocd_grant);
3526 static int osc_disconnect(struct obd_export *exp)
3528 struct obd_device *obd = class_exp2obd(exp);
3529 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3532 if (obd->u.cli.cl_conn_count == 1)
3533 /* flush any remaining cancel messages out to the target */
3534 llog_sync(ctxt, exp);
3536 rc = client_disconnect_export(exp);
3540 static int osc_import_event(struct obd_device *obd,
3541 struct obd_import *imp,
3542 enum obd_import_event event)
3544 struct client_obd *cli;
3548 LASSERT(imp->imp_obd == obd);
3551 case IMP_EVENT_DISCON: {
3552 /* Only do this on the MDS OSC's */
3553 if (imp->imp_server_timeout) {
3554 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3556 spin_lock(&oscc->oscc_lock);
3557 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3558 spin_unlock(&oscc->oscc_lock);
3561 client_obd_list_lock(&cli->cl_loi_list_lock);
3562 cli->cl_avail_grant = 0;
3563 cli->cl_lost_grant = 0;
3564 client_obd_list_unlock(&cli->cl_loi_list_lock);
3567 case IMP_EVENT_INACTIVE: {
3568 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3571 case IMP_EVENT_INVALIDATE: {
3572 struct ldlm_namespace *ns = obd->obd_namespace;
3576 client_obd_list_lock(&cli->cl_loi_list_lock);
3577 /* all pages go to failing rpcs due to the invalid import */
3578 osc_check_rpcs(cli);
3579 client_obd_list_unlock(&cli->cl_loi_list_lock);
3581 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3585 case IMP_EVENT_ACTIVE: {
3586 /* Only do this on the MDS OSC's */
3587 if (imp->imp_server_timeout) {
3588 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3590 spin_lock(&oscc->oscc_lock);
3591 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3592 spin_unlock(&oscc->oscc_lock);
3594 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3597 case IMP_EVENT_OCD: {
3598 struct obd_connect_data *ocd = &imp->imp_connect_data;
3600 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3601 osc_init_grant(&obd->u.cli, ocd);
3604 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3605 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3607 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3611 CERROR("Unknown import event %d\n", event);
3617 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3623 rc = ptlrpcd_addref();
3627 rc = client_obd_setup(obd, lcfg);
3631 struct lprocfs_static_vars lvars;
3632 struct client_obd *cli = &obd->u.cli;
3634 lprocfs_init_vars(osc, &lvars);
3635 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3636 lproc_osc_attach_seqstat(obd);
3637 ptlrpc_lprocfs_register_obd(obd);
3641 /* We need to allocate a few requests more, because
3642 brw_interpret_oap tries to create new requests before freeing
3643 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3644 reserved, but I afraid that might be too much wasted RAM
3645 in fact, so 2 is just my guess and still should work. */
3646 cli->cl_import->imp_rq_pool =
3647 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3649 ptlrpc_add_rqs_to_pool);
3655 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3661 case OBD_CLEANUP_EARLY: {
3662 struct obd_import *imp;
3663 imp = obd->u.cli.cl_import;
3664 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3665 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3666 ptlrpc_deactivate_import(imp);
3667 spin_lock(&imp->imp_lock);
3668 imp->imp_pingable = 0;
3669 spin_unlock(&imp->imp_lock);
3672 case OBD_CLEANUP_EXPORTS: {
3673 /* If we set up but never connected, the
3674 client import will not have been cleaned. */
3675 if (obd->u.cli.cl_import) {
3676 struct obd_import *imp;
3677 imp = obd->u.cli.cl_import;
3678 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3680 ptlrpc_invalidate_import(imp);
3681 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3682 class_destroy_import(imp);
3683 obd->u.cli.cl_import = NULL;
3687 case OBD_CLEANUP_SELF_EXP:
3688 rc = obd_llog_finish(obd, 0);
3690 CERROR("failed to cleanup llogging subsystems\n");
3692 case OBD_CLEANUP_OBD:
3698 int osc_cleanup(struct obd_device *obd)
3700 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3704 ptlrpc_lprocfs_unregister_obd(obd);
3705 lprocfs_obd_cleanup(obd);
3707 spin_lock(&oscc->oscc_lock);
3708 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3709 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3710 spin_unlock(&oscc->oscc_lock);
3712 /* free memory of osc quota cache */
3713 lquota_cleanup(quota_interface, obd);
3715 rc = client_obd_cleanup(obd);
3721 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3723 struct lustre_cfg *lcfg = buf;
3724 struct lprocfs_static_vars lvars;
3727 lprocfs_init_vars(osc, &lvars);
3729 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3733 struct obd_ops osc_obd_ops = {
3734 .o_owner = THIS_MODULE,
3735 .o_setup = osc_setup,
3736 .o_precleanup = osc_precleanup,
3737 .o_cleanup = osc_cleanup,
3738 .o_add_conn = client_import_add_conn,
3739 .o_del_conn = client_import_del_conn,
3740 .o_connect = client_connect_import,
3741 .o_reconnect = osc_reconnect,
3742 .o_disconnect = osc_disconnect,
3743 .o_statfs = osc_statfs,
3744 .o_statfs_async = osc_statfs_async,
3745 .o_packmd = osc_packmd,
3746 .o_unpackmd = osc_unpackmd,
3747 .o_precreate = osc_precreate,
3748 .o_create = osc_create,
3749 .o_destroy = osc_destroy,
3750 .o_getattr = osc_getattr,
3751 .o_getattr_async = osc_getattr_async,
3752 .o_setattr = osc_setattr,
3753 .o_setattr_async = osc_setattr_async,
3755 .o_brw_async = osc_brw_async,
3756 .o_prep_async_page = osc_prep_async_page,
3757 .o_queue_async_io = osc_queue_async_io,
3758 .o_set_async_flags = osc_set_async_flags,
3759 .o_queue_group_io = osc_queue_group_io,
3760 .o_trigger_group_io = osc_trigger_group_io,
3761 .o_teardown_async_page = osc_teardown_async_page,
3762 .o_punch = osc_punch,
3764 .o_enqueue = osc_enqueue,
3765 .o_match = osc_match,
3766 .o_change_cbdata = osc_change_cbdata,
3767 .o_cancel = osc_cancel,
3768 .o_cancel_unused = osc_cancel_unused,
3769 .o_join_lru = osc_join_lru,
3770 .o_iocontrol = osc_iocontrol,
3771 .o_get_info = osc_get_info,
3772 .o_set_info_async = osc_set_info_async,
3773 .o_import_event = osc_import_event,
3774 .o_llog_init = osc_llog_init,
3775 .o_llog_finish = osc_llog_finish,
3776 .o_process_config = osc_process_config,
3779 int __init osc_init(void)
3781 struct lprocfs_static_vars lvars;
3785 lprocfs_init_vars(osc, &lvars);
3787 request_module("lquota");
3788 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3789 lquota_init(quota_interface);
3790 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3792 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3793 LUSTRE_OSC_NAME, NULL);
3795 if (quota_interface)
3796 PORTAL_SYMBOL_PUT(osc_quota_interface);
3804 static void /*__exit*/ osc_exit(void)
3806 lquota_exit(quota_interface);
3807 if (quota_interface)
3808 PORTAL_SYMBOL_PUT(osc_quota_interface);
3810 class_unregister_type(LUSTRE_OSC_NAME);
3813 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3814 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3815 MODULE_LICENSE("GPL");
3817 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);