1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include "osc_internal.h"
60 static quota_interface_t *quota_interface = NULL;
61 extern quota_interface_t osc_quota_interface;
63 /* Pack OSC object metadata for disk storage (LE byte order). */
64 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
65 struct lov_stripe_md *lsm)
70 lmm_size = sizeof(**lmmp);
75 OBD_FREE(*lmmp, lmm_size);
81 OBD_ALLOC(*lmmp, lmm_size);
87 LASSERT(lsm->lsm_object_id);
88 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
94 /* Unpack OSC object metadata from disk storage (LE byte order). */
95 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
96 struct lov_mds_md *lmm, int lmm_bytes)
102 if (lmm_bytes < sizeof (*lmm)) {
103 CERROR("lov_mds_md too small: %d, need %d\n",
104 lmm_bytes, (int)sizeof(*lmm));
107 /* XXX LOV_MAGIC etc check? */
109 if (lmm->lmm_object_id == 0) {
110 CERROR("lov_mds_md: zero lmm_object_id\n");
115 lsm_size = lov_stripe_md_size(1);
119 if (*lsmp != NULL && lmm == NULL) {
120 OBD_FREE(*lsmp, lsm_size);
126 OBD_ALLOC(*lsmp, lsm_size);
129 loi_init((*lsmp)->lsm_oinfo);
133 /* XXX zero *lsmp? */
134 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
135 LASSERT((*lsmp)->lsm_object_id);
138 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
143 static int osc_getattr_interpret(struct ptlrpc_request *req,
144 struct osc_getattr_async_args *aa, int rc)
146 struct ost_body *body;
152 body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
154 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
155 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
157 /* This should really be sent by the OST */
158 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
159 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
161 CERROR("can't unpack ost_body\n");
163 aa->aa_oa->o_valid = 0;
169 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
170 struct lov_stripe_md *md,
171 struct ptlrpc_request_set *set)
173 struct ptlrpc_request *request;
174 struct ost_body *body;
175 int size = sizeof(*body);
176 struct osc_getattr_async_args *aa;
179 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
180 OST_GETATTR, 1, &size, NULL);
184 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
185 memcpy(&body->oa, oa, sizeof(*oa));
187 request->rq_replen = lustre_msg_size(1, &size);
188 request->rq_interpret_reply = osc_getattr_interpret;
190 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
191 aa = (struct osc_getattr_async_args *)&request->rq_async_args;
194 ptlrpc_set_add_req (set, request);
198 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
199 struct lov_stripe_md *md)
201 struct ptlrpc_request *request;
202 struct ost_body *body;
203 int rc, size = sizeof(*body);
206 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
207 OST_GETATTR, 1, &size, NULL);
211 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
212 memcpy(&body->oa, oa, sizeof(*oa));
214 request->rq_replen = lustre_msg_size(1, &size);
216 rc = ptlrpc_queue_wait(request);
218 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
222 body = lustre_swab_repbuf(request, 0, sizeof (*body),
223 lustre_swab_ost_body);
225 CERROR ("can't unpack ost_body\n");
226 GOTO (out, rc = -EPROTO);
229 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
230 memcpy(oa, &body->oa, sizeof(*oa));
232 /* This should really be sent by the OST */
233 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
234 oa->o_valid |= OBD_MD_FLBLKSZ;
238 ptlrpc_req_finished(request);
242 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
243 struct lov_stripe_md *md, struct obd_trans_info *oti)
245 struct ptlrpc_request *request;
246 struct ost_body *body;
247 int rc, size = sizeof(*body);
250 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
251 OST_SETATTR, 1, &size, NULL);
255 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
256 memcpy(&body->oa, oa, sizeof(*oa));
258 request->rq_replen = lustre_msg_size(1, &size);
260 rc = ptlrpc_queue_wait(request);
264 body = lustre_swab_repbuf(request, 0, sizeof(*body),
265 lustre_swab_ost_body);
267 GOTO(out, rc = -EPROTO);
269 memcpy(oa, &body->oa, sizeof(*oa));
273 ptlrpc_req_finished(request);
277 static int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
278 struct lov_stripe_md *md,
279 struct obd_trans_info *oti)
281 struct ptlrpc_request *request;
282 struct ost_body *body;
283 int rc = 0, size = sizeof(*body);
288 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
289 OST_SETATTR, 1, &size, NULL);
293 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
295 if (oa->o_valid & OBD_MD_FLCOOKIE)
296 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
297 sizeof(*oti->oti_logcookies));
299 memcpy(&body->oa, oa, sizeof(*oa));
300 request->rq_replen = lustre_msg_size(1, &size);
301 /* do mds to ost setattr asynchronouly */
302 ptlrpcd_add_req(request);
307 int osc_real_create(struct obd_export *exp, struct obdo *oa,
308 struct lov_stripe_md **ea, struct obd_trans_info *oti)
310 struct ptlrpc_request *request;
311 struct ost_body *body;
312 struct lov_stripe_md *lsm;
313 int rc, size = sizeof(*body);
321 rc = obd_alloc_memmd(exp, &lsm);
326 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
327 OST_CREATE, 1, &size, NULL);
329 GOTO(out, rc = -ENOMEM);
331 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
332 memcpy(&body->oa, oa, sizeof(body->oa));
334 request->rq_replen = lustre_msg_size(1, &size);
335 if (oa->o_valid & OBD_MD_FLINLINE) {
336 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
337 oa->o_flags == OBD_FL_DELORPHAN);
338 DEBUG_REQ(D_HA, request,
339 "delorphan from OST integration");
340 /* Don't resend the delorphan request */
341 request->rq_no_resend = request->rq_no_delay = 1;
344 rc = ptlrpc_queue_wait(request);
348 body = lustre_swab_repbuf(request, 0, sizeof(*body),
349 lustre_swab_ost_body);
351 CERROR ("can't unpack ost_body\n");
352 GOTO (out_req, rc = -EPROTO);
355 memcpy(oa, &body->oa, sizeof(*oa));
357 /* This should really be sent by the OST */
358 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
359 oa->o_valid |= OBD_MD_FLBLKSZ;
361 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
362 * have valid lsm_oinfo data structs, so don't go touching that.
363 * This needs to be fixed in a big way.
365 lsm->lsm_object_id = oa->o_id;
369 oti->oti_transno = request->rq_repmsg->transno;
371 if (oa->o_valid & OBD_MD_FLCOOKIE) {
372 if (!oti->oti_logcookies)
373 oti_alloc_cookies(oti, 1);
374 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
375 sizeof(oti->oti_onecookie));
379 CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
382 ptlrpc_req_finished(request);
385 obd_free_memmd(exp, &lsm);
389 static int osc_punch(struct obd_export *exp, struct obdo *oa,
390 struct lov_stripe_md *md, obd_size start,
391 obd_size end, struct obd_trans_info *oti)
393 struct ptlrpc_request *request;
394 struct ost_body *body;
395 int rc, size = sizeof(*body);
403 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
404 OST_PUNCH, 1, &size, NULL);
408 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
409 memcpy(&body->oa, oa, sizeof(*oa));
411 /* overload the size and blocks fields in the oa with start/end */
412 body->oa.o_size = start;
413 body->oa.o_blocks = end;
414 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
416 request->rq_replen = lustre_msg_size(1, &size);
418 rc = ptlrpc_queue_wait(request);
422 body = lustre_swab_repbuf (request, 0, sizeof (*body),
423 lustre_swab_ost_body);
425 CERROR ("can't unpack ost_body\n");
426 GOTO (out, rc = -EPROTO);
429 memcpy(oa, &body->oa, sizeof(*oa));
433 ptlrpc_req_finished(request);
437 static int osc_sync(struct obd_export *exp, struct obdo *oa,
438 struct lov_stripe_md *md, obd_size start, obd_size end)
440 struct ptlrpc_request *request;
441 struct ost_body *body;
442 int rc, size = sizeof(*body);
450 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
451 OST_SYNC, 1, &size, NULL);
455 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
456 memcpy(&body->oa, oa, sizeof(*oa));
458 /* overload the size and blocks fields in the oa with start/end */
459 body->oa.o_size = start;
460 body->oa.o_blocks = end;
461 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
463 request->rq_replen = lustre_msg_size(1, &size);
465 rc = ptlrpc_queue_wait(request);
469 body = lustre_swab_repbuf(request, 0, sizeof(*body),
470 lustre_swab_ost_body);
472 CERROR ("can't unpack ost_body\n");
473 GOTO (out, rc = -EPROTO);
476 memcpy(oa, &body->oa, sizeof(*oa));
480 ptlrpc_req_finished(request);
484 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
485 struct lov_stripe_md *ea, struct obd_trans_info *oti,
486 struct obd_export *md_export)
488 struct ptlrpc_request *request;
489 struct ost_body *body;
490 int rc, size = sizeof(*body);
498 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
499 OST_DESTROY, 1, &size, NULL);
503 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
505 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
506 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
507 sizeof(*oti->oti_logcookies));
508 oti->oti_logcookies++;
511 memcpy(&body->oa, oa, sizeof(*oa));
512 request->rq_replen = lustre_msg_size(1, &size);
514 rc = ptlrpc_queue_wait(request);
520 body = lustre_swab_repbuf(request, 0, sizeof(*body),
521 lustre_swab_ost_body);
523 CERROR ("Can't unpack body\n");
524 GOTO (out, rc = -EPROTO);
527 memcpy(oa, &body->oa, sizeof(*oa));
531 ptlrpc_req_finished(request);
535 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
538 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
540 LASSERT(!(oa->o_valid & bits));
543 client_obd_list_lock(&cli->cl_loi_list_lock);
544 oa->o_dirty = cli->cl_dirty;
545 if (cli->cl_dirty > cli->cl_dirty_max) {
546 CERROR("dirty %lu > dirty_max %lu\n",
547 cli->cl_dirty, cli->cl_dirty_max);
549 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
550 CERROR("dirty %lu - dirty_max %lu too big???\n",
551 cli->cl_dirty, cli->cl_dirty_max);
554 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
555 (cli->cl_max_rpcs_in_flight + 1);
556 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
558 oa->o_grant = cli->cl_avail_grant;
559 oa->o_dropped = cli->cl_lost_grant;
560 cli->cl_lost_grant = 0;
561 client_obd_list_unlock(&cli->cl_loi_list_lock);
562 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
563 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
566 /* caller must hold loi_list_lock */
567 static void osc_consume_write_grant(struct client_obd *cli,
568 struct osc_async_page *oap)
570 cli->cl_dirty += CFS_PAGE_SIZE;
571 cli->cl_avail_grant -= CFS_PAGE_SIZE;
572 oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
573 CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", CFS_PAGE_SIZE, oap);
574 LASSERT(cli->cl_avail_grant >= 0);
577 static unsigned long rpcs_in_flight(struct client_obd *cli)
579 return cli->cl_r_in_flight + cli->cl_w_in_flight;
582 /* caller must hold loi_list_lock */
583 void osc_wake_cache_waiters(struct client_obd *cli)
585 struct list_head *l, *tmp;
586 struct osc_cache_waiter *ocw;
589 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
590 /* if we can't dirty more, we must wait until some is written */
591 if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) {
592 CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
593 cli->cl_dirty, cli->cl_dirty_max);
597 /* if still dirty cache but no grant wait for pending RPCs that
598 * may yet return us some grant before doing sync writes */
599 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
600 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
601 cli->cl_w_in_flight);
605 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
606 list_del_init(&ocw->ocw_entry);
607 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
608 /* no more RPCs in flight to return grant, do sync IO */
609 ocw->ocw_rc = -EDQUOT;
610 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
612 osc_consume_write_grant(cli, ocw->ocw_oap);
615 cfs_waitq_signal(&ocw->ocw_waitq);
621 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
623 client_obd_list_lock(&cli->cl_loi_list_lock);
624 cli->cl_avail_grant = ocd->ocd_grant;
625 client_obd_list_unlock(&cli->cl_loi_list_lock);
627 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
628 cli->cl_avail_grant, cli->cl_lost_grant);
629 LASSERT(cli->cl_avail_grant >= 0);
632 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
634 client_obd_list_lock(&cli->cl_loi_list_lock);
635 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
636 cli->cl_avail_grant += body->oa.o_grant;
637 /* waiters are woken in brw_interpret_oap */
638 client_obd_list_unlock(&cli->cl_loi_list_lock);
641 /* We assume that the reason this OSC got a short read is because it read
642 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
643 * via the LOV, and it _knows_ it's reading inside the file, it's just that
644 * this stripe never got written at or beyond this stripe offset yet. */
645 static void handle_short_read(int nob_read, obd_count page_count,
646 struct brw_page *pga)
650 /* skip bytes read OK */
651 while (nob_read > 0) {
652 LASSERT (page_count > 0);
654 if (pga->count > nob_read) {
655 /* EOF inside this page */
656 ptr = cfs_kmap(pga->pg) + (pga->off & ~CFS_PAGE_MASK);
657 memset(ptr + nob_read, 0, pga->count - nob_read);
664 nob_read -= pga->count;
669 /* zero remaining pages */
670 while (page_count-- > 0) {
671 ptr = cfs_kmap(pga->pg) + (pga->off & ~CFS_PAGE_MASK);
672 memset(ptr, 0, pga->count);
678 static int check_write_rcs(struct ptlrpc_request *request,
679 int requested_nob, int niocount,
680 obd_count page_count, struct brw_page *pga)
684 /* return error if any niobuf was in error */
685 remote_rcs = lustre_swab_repbuf(request, 1,
686 sizeof(*remote_rcs) * niocount, NULL);
687 if (remote_rcs == NULL) {
688 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
691 if (lustre_msg_swabbed(request->rq_repmsg))
692 for (i = 0; i < niocount; i++)
693 __swab32s(&remote_rcs[i]);
695 for (i = 0; i < niocount; i++) {
696 if (remote_rcs[i] < 0)
697 return(remote_rcs[i]);
699 if (remote_rcs[i] != 0) {
700 CERROR("rc[%d] invalid (%d) req %p\n",
701 i, remote_rcs[i], request);
706 if (request->rq_bulk->bd_nob_transferred != requested_nob) {
707 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
708 requested_nob, request->rq_bulk->bd_nob_transferred);
715 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
717 if (p1->flag != p2->flag) {
718 unsigned mask = ~OBD_BRW_FROM_GRANT;
720 /* warn if we try to combine flags that we don't know to be
722 if ((p1->flag & mask) != (p2->flag & mask))
723 CERROR("is it ok to have flags 0x%x and 0x%x in the "
724 "same brw?\n", p1->flag, p2->flag);
728 return (p1->off + p1->count == p2->off);
731 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
732 struct brw_page *pga)
736 LASSERT (pg_count > 0);
737 while (nob > 0 && pg_count > 0) {
738 char *ptr = cfs_kmap(pga->pg);
739 int off = pga->off & ~CFS_PAGE_MASK;
740 int count = pga->count > nob ? nob : pga->count;
742 cksum = crc32_le(cksum, ptr + off, count);
744 LL_CDEBUG_PAGE(D_PAGE, pga->pg, "off %d checksum %x\n",
755 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
756 struct lov_stripe_md *lsm, obd_count page_count,
757 struct brw_page *pga, int *requested_nobp,
758 int *niocountp, struct ptlrpc_request **reqp)
760 struct ptlrpc_request *req;
761 struct ptlrpc_bulk_desc *desc;
762 struct client_obd *cli = &imp->imp_obd->u.cli;
763 struct ost_body *body;
764 struct obd_ioobj *ioobj;
765 struct niobuf_remote *niobuf;
772 struct ptlrpc_request_pool *pool;
775 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
776 pool = ((cmd & OBD_BRW_WRITE) != 0) ? imp->imp_rq_pool : NULL;
778 for (niocount = i = 1; i < page_count; i++)
779 if (!can_merge_pages(&pga[i - 1], &pga[i]))
782 size[0] = sizeof(*body);
783 size[1] = sizeof(*ioobj);
784 size[2] = niocount * sizeof(*niobuf);
786 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
787 req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 3,
792 /* FIXME bug 249. Also see bug 7198 */
793 if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
794 req->rq_request_portal = OST_IO_PORTAL;
796 if (opc == OST_WRITE)
797 desc = ptlrpc_prep_bulk_imp (req, page_count,
798 BULK_GET_SOURCE, OST_BULK_PORTAL);
800 desc = ptlrpc_prep_bulk_imp (req, page_count,
801 BULK_PUT_SINK, OST_BULK_PORTAL);
803 GOTO(out, rc = -ENOMEM);
804 /* NB request now owns desc and will free it when it gets freed */
806 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
807 ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
808 niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
810 memcpy(&body->oa, oa, sizeof(*oa));
812 obdo_to_ioobj(oa, ioobj);
813 ioobj->ioo_bufcnt = niocount;
815 LASSERT (page_count > 0);
816 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
817 struct brw_page *pg = &pga[i];
818 struct brw_page *pg_prev = pg - 1;
820 LASSERT(pg->count > 0);
821 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
822 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
825 LASSERTF(i == 0 || pg->off > pg_prev->off,
826 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
827 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
829 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
830 pg_prev->pg, page_private(pg_prev->pg),
831 pg_prev->pg->index, pg_prev->off);
833 LASSERTF(i == 0 || pg->off > pg_prev->off,
834 "i %d p_c %u\n", i, page_count);
836 LASSERT((pga[0].flag & OBD_BRW_SRVLOCK) ==
837 (pg->flag & OBD_BRW_SRVLOCK));
839 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
841 requested_nob += pg->count;
843 if (i > 0 && can_merge_pages(pg_prev, pg)) {
845 niobuf->len += pg->count;
847 niobuf->offset = pg->off;
848 niobuf->len = pg->count;
849 niobuf->flags = pg->flag;
853 LASSERT((void *)(niobuf - niocount) ==
854 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
855 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
857 /* size[0] still sizeof (*body) */
858 if (opc == OST_WRITE) {
859 if (unlikely(cli->cl_checksum)) {
860 body->oa.o_valid |= OBD_MD_FLCKSUM;
861 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
863 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
865 /* save this in 'oa', too, for later checking */
866 oa->o_valid |= OBD_MD_FLCKSUM;
867 oa->o_cksum = body->oa.o_cksum;
869 /* 1 RC per niobuf */
870 size[1] = sizeof(__u32) * niocount;
871 req->rq_replen = lustre_msg_size(2, size);
873 if (unlikely(cli->cl_checksum))
874 body->oa.o_valid |= OBD_MD_FLCKSUM;
875 /* 1 RC for the whole I/O */
876 req->rq_replen = lustre_msg_size(1, size);
879 *niocountp = niocount;
880 *requested_nobp = requested_nob;
885 ptlrpc_req_finished (req);
889 static void check_write_csum(__u32 cli, __u32 srv, int requested_nob,
890 obd_count page_count, struct brw_page *pga)
895 CDEBUG(D_PAGE, "checksum %x confirmed\n", cli);
899 new_csum = osc_checksum_bulk(requested_nob, page_count, pga);
901 if (new_csum == srv) {
902 CERROR("BAD CHECKSUM (WRITE): pages were mutated on the client"
903 "after we checksummed them (original client csum:"
904 " %x; server csum: %x; client csum now: %x)\n",
909 if (new_csum == cli) {
910 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit "
911 "(original client csum: %x; server csum: %x; client "
912 "csum now: %x)\n", cli, srv, new_csum);
916 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit, and the "
917 "current page contents don't match the originals OR what the "
918 "server received (original client csum: %x; server csum: %x; "
919 "client csum now: %x)\n", cli, srv, new_csum);
922 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
923 int requested_nob, int niocount,
924 obd_count page_count, struct brw_page *pga,
927 const lnet_process_id_t *peer =
928 &req->rq_import->imp_connection->c_peer;
929 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
930 struct ost_body *body;
931 __u32 client_cksum = 0;
934 if (rc < 0 && rc != -EDQUOT)
937 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
938 body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
940 CERROR ("Can't unpack body\n");
944 /* set/clear over quota flag for a uid/gid */
945 if (req->rq_reqmsg->opc == OST_WRITE &&
946 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
947 lquota_setdq(quota_interface, cli, body->oa.o_uid,
948 body->oa.o_gid, body->oa.o_valid,
954 if (unlikely(oa->o_valid & OBD_MD_FLCKSUM))
955 client_cksum = oa->o_cksum; /* save for later */
957 osc_update_grant(cli, body);
958 memcpy(oa, &body->oa, sizeof(*oa));
960 if (req->rq_reqmsg->opc == OST_WRITE) {
962 CERROR ("Unexpected +ve rc %d\n", rc);
965 LASSERT (req->rq_bulk->bd_nob == requested_nob);
967 if (unlikely((oa->o_valid & OBD_MD_FLCKSUM) &&
969 check_write_csum(client_cksum, oa->o_cksum,
970 requested_nob, page_count, pga);
973 RETURN(check_write_rcs(req, requested_nob, niocount,
977 /* The rest of this function executes only for OST_READs */
978 if (rc > requested_nob) {
979 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
983 if (rc != req->rq_bulk->bd_nob_transferred) {
984 CERROR ("Unexpected rc %d (%d transferred)\n",
985 rc, req->rq_bulk->bd_nob_transferred);
989 if (rc < requested_nob)
990 handle_short_read(rc, page_count, pga);
992 if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) {
993 static int cksum_counter;
994 __u32 cksum = osc_checksum_bulk(rc, page_count, pga);
995 __u32 server_cksum = oa->o_cksum;
997 if (server_cksum == ~0 && rc > 0) {
998 CERROR("Protocol error: server %s set the 'checksum' "
999 "bit, but didn't send a checksum. Not fatal, "
1000 "but please tell CFS.\n",
1001 libcfs_nid2str(peer->nid));
1007 if (server_cksum != cksum) {
1008 CERROR("Bad checksum from %s: server %x != client %x\n",
1009 libcfs_nid2str(peer->nid), server_cksum, cksum);
1011 oa->o_cksum = cksum;
1012 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1013 CWARN("Checksum %u from %s OK: %x\n",
1014 cksum_counter, libcfs_nid2str(peer->nid), cksum);
1016 CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum);
1017 } else if (unlikely(client_cksum)) {
1018 static int cksum_missed;
1021 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1022 CERROR("Checksum %u requested from %s but not sent\n",
1023 cksum_missed, libcfs_nid2str(peer->nid));
1029 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1030 struct lov_stripe_md *lsm,
1031 obd_count page_count, struct brw_page *pga)
1035 struct ptlrpc_request *request;
1040 rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1041 page_count, pga, &requested_nob, &niocount,
1046 rc = ptlrpc_queue_wait(request);
1048 if (rc == -ETIMEDOUT && request->rq_resend) {
1049 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1050 ptlrpc_req_finished(request);
1054 rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1055 page_count, pga, rc);
1057 ptlrpc_req_finished(request);
1061 static int brw_interpret(struct ptlrpc_request *request,
1062 struct osc_brw_async_args *aa, int rc)
1064 struct obdo *oa = aa->aa_oa;
1065 int requested_nob = aa->aa_requested_nob;
1066 int niocount = aa->aa_nio_count;
1067 obd_count page_count = aa->aa_page_count;
1068 struct brw_page *pga = aa->aa_pga;
1071 rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1072 page_count, pga, rc);
1076 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1077 struct lov_stripe_md *lsm, obd_count page_count,
1078 struct brw_page *pga, struct ptlrpc_request_set *set)
1080 struct ptlrpc_request *request;
1083 struct osc_brw_async_args *aa;
1087 rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1088 page_count, pga, &requested_nob, &nio_count,
1092 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1093 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1095 aa->aa_requested_nob = requested_nob;
1096 aa->aa_nio_count = nio_count;
1097 aa->aa_page_count = page_count;
1100 request->rq_interpret_reply = brw_interpret;
1101 ptlrpc_set_add_req(set, request);
1107 #define min_t(type,x,y) \
1108 ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
1112 * ugh, we want disk allocation on the target to happen in offset order. we'll
1113 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1114 * fine for our small page arrays and doesn't require allocation. its an
1115 * insertion sort that swaps elements that are strides apart, shrinking the
1116 * stride down until its '1' and the array is sorted.
1118 static void sort_brw_pages(struct brw_page *array, int num)
1121 struct brw_page tmp;
1125 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1130 for (i = stride ; i < num ; i++) {
1133 while (j >= stride && array[j - stride].off > tmp.off) {
1134 array[j] = array[j - stride];
1139 } while (stride > 1);
1142 static obd_count max_unfragmented_pages(struct brw_page *pg, obd_count pages)
1147 LASSERT (pages > 0);
1148 offset = pg->off & (CFS_PAGE_SIZE - 1);
1152 if (pages == 0) /* that's all */
1155 if (offset + pg->count < CFS_PAGE_SIZE)
1156 return count; /* doesn't end on page boundary */
1159 offset = pg->off & (CFS_PAGE_SIZE - 1);
1160 if (offset != 0) /* doesn't start on page boundary */
1167 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1168 struct lov_stripe_md *md, obd_count page_count,
1169 struct brw_page *pga, struct obd_trans_info *oti)
1171 struct obdo *saved_oa = NULL;
1175 if (cmd & OBD_BRW_CHECK) {
1176 /* The caller just wants to know if there's a chance that this
1177 * I/O can succeed */
1178 struct obd_import *imp = class_exp2cliimp(exp);
1180 if (imp == NULL || imp->imp_invalid)
1187 while (page_count) {
1188 obd_count pages_per_brw;
1190 if (page_count > PTLRPC_MAX_BRW_PAGES)
1191 pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1193 pages_per_brw = page_count;
1195 sort_brw_pages(pga, pages_per_brw);
1196 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1198 if (saved_oa != NULL) {
1199 /* restore previously saved oa */
1201 } else if (page_count > pages_per_brw) {
1202 /* save a copy of oa (brw will clobber it) */
1203 saved_oa = obdo_alloc();
1204 if (saved_oa == NULL)
1209 rc = osc_brw_internal(cmd, exp, oa, md, pages_per_brw, pga);
1214 page_count -= pages_per_brw;
1215 pga += pages_per_brw;
1218 if (saved_oa != NULL)
1219 obdo_free(saved_oa);
1224 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1225 struct lov_stripe_md *md, obd_count page_count,
1226 struct brw_page *pga, struct ptlrpc_request_set *set,
1227 struct obd_trans_info *oti)
1231 if (cmd & OBD_BRW_CHECK) {
1232 /* The caller just wants to know if there's a chance that this
1233 * I/O can succeed */
1234 struct obd_import *imp = class_exp2cliimp(exp);
1236 if (imp == NULL || imp->imp_invalid)
1241 while (page_count) {
1242 obd_count pages_per_brw;
1245 if (page_count > PTLRPC_MAX_BRW_PAGES)
1246 pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1248 pages_per_brw = page_count;
1250 sort_brw_pages(pga, pages_per_brw);
1251 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1253 rc = async_internal(cmd, exp, oa, md, pages_per_brw, pga, set);
1258 page_count -= pages_per_brw;
1259 pga += pages_per_brw;
1264 static void osc_check_rpcs(struct client_obd *cli);
1265 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1268 /* This maintains the lists of pending pages to read/write for a given object
1269 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1270 * to quickly find objects that are ready to send an RPC. */
1271 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1277 if (lop->lop_num_pending == 0)
1280 /* if we have an invalid import we want to drain the queued pages
1281 * by forcing them through rpcs that immediately fail and complete
1282 * the pages. recovery relies on this to empty the queued pages
1283 * before canceling the locks and evicting down the llite pages */
1284 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1287 /* stream rpcs in queue order as long as as there is an urgent page
1288 * queued. this is our cheap solution for good batching in the case
1289 * where writepage marks some random page in the middle of the file
1290 * as urgent because of, say, memory pressure */
1291 if (!list_empty(&lop->lop_urgent))
1294 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1295 optimal = cli->cl_max_pages_per_rpc;
1296 if (cmd & OBD_BRW_WRITE) {
1297 /* trigger a write rpc stream as long as there are dirtiers
1298 * waiting for space. as they're waiting, they're not going to
1299 * create more pages to coallesce with what's waiting.. */
1300 if (!list_empty(&cli->cl_cache_waiters))
1303 /* +16 to avoid triggering rpcs that would want to include pages
1304 * that are being queued but which can't be made ready until
1305 * the queuer finishes with the page. this is a wart for
1306 * llite::commit_write() */
1309 if (lop->lop_num_pending >= optimal)
1315 static void on_list(struct list_head *item, struct list_head *list,
1318 if (list_empty(item) && should_be_on)
1319 list_add_tail(item, list);
1320 else if (!list_empty(item) && !should_be_on)
1321 list_del_init(item);
1324 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1325 * can find pages to build into rpcs quickly */
1326 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1328 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1329 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1330 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1332 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1333 loi->loi_write_lop.lop_num_pending);
1335 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1336 loi->loi_read_lop.lop_num_pending);
1339 static void lop_update_pending(struct client_obd *cli,
1340 struct loi_oap_pages *lop, int cmd, int delta)
1342 lop->lop_num_pending += delta;
1343 if (cmd & OBD_BRW_WRITE)
1344 cli->cl_pending_w_pages += delta;
1346 cli->cl_pending_r_pages += delta;
1349 /* this is called when a sync waiter receives an interruption. Its job is to
1350 * get the caller woken as soon as possible. If its page hasn't been put in an
1351 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1352 * desiring interruption which will forcefully complete the rpc once the rpc
1354 static void osc_occ_interrupted(struct oig_callback_context *occ)
1356 struct osc_async_page *oap;
1357 struct loi_oap_pages *lop;
1358 struct lov_oinfo *loi;
1361 /* XXX member_of() */
1362 oap = list_entry(occ, struct osc_async_page, oap_occ);
1364 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1366 oap->oap_interrupted = 1;
1368 /* ok, it's been put in an rpc. */
1369 if (oap->oap_request != NULL) {
1370 ptlrpc_mark_interrupted(oap->oap_request);
1371 ptlrpcd_wake(oap->oap_request);
1375 /* we don't get interruption callbacks until osc_trigger_group_io()
1376 * has been called and put the sync oaps in the pending/urgent lists.*/
1377 if (!list_empty(&oap->oap_pending_item)) {
1378 list_del_init(&oap->oap_pending_item);
1379 list_del_init(&oap->oap_urgent_item);
1382 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1383 &loi->loi_write_lop : &loi->loi_read_lop;
1384 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1385 loi_list_maint(oap->oap_cli, oap->oap_loi);
1387 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1388 oap->oap_oig = NULL;
1392 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1395 /* this is trying to propogate async writeback errors back up to the
1396 * application. As an async write fails we record the error code for later if
1397 * the app does an fsync. As long as errors persist we force future rpcs to be
1398 * sync so that the app can get a sync error and break the cycle of queueing
1399 * pages for which writeback will fail. */
1400 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1407 ar->ar_force_sync = 1;
1408 ar->ar_min_xid = ptlrpc_sample_next_xid();
1413 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1414 ar->ar_force_sync = 0;
1417 /* this must be called holding the loi list lock to give coverage to exit_cache,
1418 * async_flag maintenance, and oap_request */
1419 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1420 struct osc_async_page *oap, int sent, int rc)
1423 osc_exit_cache(cli, oap, sent);
1424 oap->oap_async_flags = 0;
1425 oap->oap_interrupted = 0;
1427 if (oap->oap_cmd & OBD_BRW_WRITE) {
1428 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1429 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1432 if (oap->oap_request != NULL) {
1433 ptlrpc_req_finished(oap->oap_request);
1434 oap->oap_request = NULL;
1437 if (rc == 0 && oa != NULL) {
1438 if (oa->o_valid & OBD_MD_FLBLOCKS)
1439 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1440 if (oa->o_valid & OBD_MD_FLMTIME)
1441 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1442 if (oa->o_valid & OBD_MD_FLATIME)
1443 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1444 if (oa->o_valid & OBD_MD_FLCTIME)
1445 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1449 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1450 oap->oap_oig = NULL;
1455 oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1460 static int brw_interpret_oap(struct ptlrpc_request *request,
1461 struct osc_brw_async_args *aa, int rc)
1463 struct osc_async_page *oap;
1464 struct client_obd *cli;
1465 struct list_head *pos, *n;
1468 rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1469 aa->aa_nio_count, aa->aa_page_count,
1472 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1476 client_obd_list_lock(&cli->cl_loi_list_lock);
1478 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1479 * is called so we know whether to go to sync BRWs or wait for more
1480 * RPCs to complete */
1481 if (request->rq_reqmsg->opc == OST_WRITE)
1482 cli->cl_w_in_flight--;
1484 cli->cl_r_in_flight--;
1486 /* the caller may re-use the oap after the completion call so
1487 * we need to clean it up a little */
1488 list_for_each_safe(pos, n, &aa->aa_oaps) {
1489 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1491 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1492 //oap->oap_page, oap->oap_page->index, oap);
1494 list_del_init(&oap->oap_rpc_item);
1495 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1498 osc_wake_cache_waiters(cli);
1499 osc_check_rpcs(cli);
1501 client_obd_list_unlock(&cli->cl_loi_list_lock);
1503 obdo_free(aa->aa_oa);
1504 OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1509 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1510 struct list_head *rpc_list,
1511 int page_count, int cmd)
1513 struct ptlrpc_request *req;
1514 struct brw_page *pga = NULL;
1515 int requested_nob, nio_count;
1516 struct osc_brw_async_args *aa;
1517 struct obdo *oa = NULL;
1518 struct obd_async_page_ops *ops = NULL;
1519 void *caller_data = NULL;
1520 struct list_head *pos;
1524 LASSERT(!list_empty(rpc_list));
1526 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1528 RETURN(ERR_PTR(-ENOMEM));
1532 GOTO(out, req = ERR_PTR(-ENOMEM));
1535 list_for_each(pos, rpc_list) {
1536 struct osc_async_page *oap;
1538 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1540 ops = oap->oap_caller_ops;
1541 caller_data = oap->oap_caller_data;
1543 pga[i].off = oap->oap_obj_off + oap->oap_page_off;
1544 pga[i].pg = oap->oap_page;
1545 pga[i].count = oap->oap_count;
1546 pga[i].flag = oap->oap_brw_flags;
1547 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1548 pga[i].pg, cfs_page_index(oap->oap_page), oap, pga[i].flag);
1552 /* always get the data for the obdo for the rpc */
1553 LASSERT(ops != NULL);
1554 ops->ap_fill_obdo(caller_data, cmd, oa);
1556 sort_brw_pages(pga, page_count);
1557 rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1558 pga, &requested_nob, &nio_count, &req);
1560 CERROR("prep_req failed: %d\n", rc);
1561 GOTO(out, req = ERR_PTR(rc));
1564 LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1565 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1567 aa->aa_requested_nob = requested_nob;
1568 aa->aa_nio_count = nio_count;
1569 aa->aa_page_count = page_count;
1578 OBD_FREE(pga, sizeof(*pga) * page_count);
1583 /* the loi lock is held across this function but it's allowed to release
1584 * and reacquire it during its work */
1585 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1586 int cmd, struct loi_oap_pages *lop)
1588 struct ptlrpc_request *request;
1589 obd_count page_count = 0;
1590 struct list_head *tmp, *pos;
1591 struct osc_async_page *oap = NULL;
1592 struct osc_brw_async_args *aa;
1593 struct obd_async_page_ops *ops;
1594 CFS_LIST_HEAD(rpc_list);
1595 unsigned int ending_offset;
1596 unsigned starting_offset = 0;
1599 /* first we find the pages we're allowed to work with */
1600 list_for_each_safe(pos, tmp, &lop->lop_pending) {
1601 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1602 ops = oap->oap_caller_ops;
1604 LASSERT(oap->oap_magic == OAP_MAGIC);
1606 /* in llite being 'ready' equates to the page being locked
1607 * until completion unlocks it. commit_write submits a page
1608 * as not ready because its unlock will happen unconditionally
1609 * as the call returns. if we race with commit_write giving
1610 * us that page we dont' want to create a hole in the page
1611 * stream, so we stop and leave the rpc to be fired by
1612 * another dirtier or kupdated interval (the not ready page
1613 * will still be on the dirty list). we could call in
1614 * at the end of ll_file_write to process the queue again. */
1615 if (!(oap->oap_async_flags & ASYNC_READY)) {
1616 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1618 CDEBUG(D_INODE, "oap %p page %p returned %d "
1619 "instead of ready\n", oap,
1623 /* llite is telling us that the page is still
1624 * in commit_write and that we should try
1625 * and put it in an rpc again later. we
1626 * break out of the loop so we don't create
1627 * a hole in the sequence of pages in the rpc
1632 /* the io isn't needed.. tell the checks
1633 * below to complete the rpc with EINTR */
1634 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1635 oap->oap_count = -EINTR;
1638 oap->oap_async_flags |= ASYNC_READY;
1641 LASSERTF(0, "oap %p page %p returned %d "
1642 "from make_ready\n", oap,
1650 * Page submitted for IO has to be locked. Either by
1651 * ->ap_make_ready() or by higher layers.
1653 * XXX nikita: this assertion should be adjusted when lustre
1654 * starts using PG_writeback for pages being written out.
1656 #if defined(__KERNEL__) && defined(__LINUX__)
1657 LASSERT(PageLocked(oap->oap_page));
1659 /* If there is a gap at the start of this page, it can't merge
1660 * with any previous page, so we'll hand the network a
1661 * "fragmented" page array that it can't transfer in 1 RDMA */
1662 if (page_count != 0 && oap->oap_page_off != 0)
1665 /* take the page out of our book-keeping */
1666 list_del_init(&oap->oap_pending_item);
1667 lop_update_pending(cli, lop, cmd, -1);
1668 list_del_init(&oap->oap_urgent_item);
1670 if (page_count == 0)
1671 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1672 (PTLRPC_MAX_BRW_SIZE - 1);
1674 /* ask the caller for the size of the io as the rpc leaves. */
1675 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1677 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1678 if (oap->oap_count <= 0) {
1679 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1681 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1685 /* now put the page back in our accounting */
1686 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1687 if (++page_count >= cli->cl_max_pages_per_rpc)
1690 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
1691 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
1692 * have the same alignment as the initial writes that allocated
1693 * extents on the server. */
1694 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
1695 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
1696 if (ending_offset == 0)
1699 /* If there is a gap at the end of this page, it can't merge
1700 * with any subsequent pages, so we'll hand the network a
1701 * "fragmented" page array that it can't transfer in 1 RDMA */
1702 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
1706 osc_wake_cache_waiters(cli);
1708 if (page_count == 0)
1711 loi_list_maint(cli, loi);
1713 client_obd_list_unlock(&cli->cl_loi_list_lock);
1715 request = osc_build_req(cli, &rpc_list, page_count, cmd);
1716 if (IS_ERR(request)) {
1717 /* this should happen rarely and is pretty bad, it makes the
1718 * pending list not follow the dirty order */
1719 client_obd_list_lock(&cli->cl_loi_list_lock);
1720 list_for_each_safe(pos, tmp, &rpc_list) {
1721 oap = list_entry(pos, struct osc_async_page,
1723 list_del_init(&oap->oap_rpc_item);
1725 /* queued sync pages can be torn down while the pages
1726 * were between the pending list and the rpc */
1727 if (oap->oap_interrupted) {
1728 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1729 osc_ap_completion(cli, NULL, oap, 0,
1733 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(request));
1735 /* put the page back in the loi/lop lists */
1736 list_add_tail(&oap->oap_pending_item,
1738 lop_update_pending(cli, lop, cmd, 1);
1739 if (oap->oap_async_flags & ASYNC_URGENT)
1740 list_add(&oap->oap_urgent_item,
1743 loi_list_maint(cli, loi);
1744 RETURN(PTR_ERR(request));
1747 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1748 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1749 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1750 list_splice(&rpc_list, &aa->aa_oaps);
1751 CFS_INIT_LIST_HEAD(&rpc_list);
1753 if (cmd == OBD_BRW_READ) {
1754 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1755 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1756 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1757 starting_offset/CFS_PAGE_SIZE + 1);
1759 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1760 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1761 cli->cl_w_in_flight);
1762 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1763 starting_offset/CFS_PAGE_SIZE + 1);
1766 client_obd_list_lock(&cli->cl_loi_list_lock);
1768 if (cmd == OBD_BRW_READ)
1769 cli->cl_r_in_flight++;
1771 cli->cl_w_in_flight++;
1773 /* queued sync pages can be torn down while the pages
1774 * were between the pending list and the rpc */
1775 list_for_each(pos, &aa->aa_oaps) {
1776 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1777 if (oap->oap_interrupted) {
1778 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1780 ptlrpc_mark_interrupted(request);
1785 CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %dr/%dw in flight\n",
1786 request, page_count, aa, cli->cl_r_in_flight,
1787 cli->cl_w_in_flight);
1789 oap->oap_request = ptlrpc_request_addref(request);
1790 request->rq_interpret_reply = brw_interpret_oap;
1791 ptlrpcd_add_req(request);
1795 #define LOI_DEBUG(LOI, STR, args...) \
1796 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
1797 !list_empty(&(LOI)->loi_cli_item), \
1798 (LOI)->loi_write_lop.lop_num_pending, \
1799 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
1800 (LOI)->loi_read_lop.lop_num_pending, \
1801 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
1804 /* This is called by osc_check_rpcs() to find which objects have pages that
1805 * we could be sending. These lists are maintained by lop_makes_rpc(). */
1806 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1809 /* first return all objects which we already know to have
1810 * pages ready to be stuffed into rpcs */
1811 if (!list_empty(&cli->cl_loi_ready_list))
1812 RETURN(list_entry(cli->cl_loi_ready_list.next,
1813 struct lov_oinfo, loi_cli_item));
1815 /* then if we have cache waiters, return all objects with queued
1816 * writes. This is especially important when many small files
1817 * have filled up the cache and not been fired into rpcs because
1818 * they don't pass the nr_pending/object threshhold */
1819 if (!list_empty(&cli->cl_cache_waiters) &&
1820 !list_empty(&cli->cl_loi_write_list))
1821 RETURN(list_entry(cli->cl_loi_write_list.next,
1822 struct lov_oinfo, loi_write_item));
1824 /* then return all queued objects when we have an invalid import
1825 * so that they get flushed */
1826 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1827 if (!list_empty(&cli->cl_loi_write_list))
1828 RETURN(list_entry(cli->cl_loi_write_list.next,
1829 struct lov_oinfo, loi_write_item));
1830 if (!list_empty(&cli->cl_loi_read_list))
1831 RETURN(list_entry(cli->cl_loi_read_list.next,
1832 struct lov_oinfo, loi_read_item));
1837 /* called with the loi list lock held */
1838 static void osc_check_rpcs(struct client_obd *cli)
1840 struct lov_oinfo *loi;
1841 int rc = 0, race_counter = 0;
1844 while ((loi = osc_next_loi(cli)) != NULL) {
1845 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
1847 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
1850 /* attempt some read/write balancing by alternating between
1851 * reads and writes in an object. The makes_rpc checks here
1852 * would be redundant if we were getting read/write work items
1853 * instead of objects. we don't want send_oap_rpc to drain a
1854 * partial read pending queue when we're given this object to
1855 * do io on writes while there are cache waiters */
1856 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1857 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1858 &loi->loi_write_lop);
1866 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1867 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1868 &loi->loi_read_lop);
1877 /* attempt some inter-object balancing by issueing rpcs
1878 * for each object in turn */
1879 if (!list_empty(&loi->loi_cli_item))
1880 list_del_init(&loi->loi_cli_item);
1881 if (!list_empty(&loi->loi_write_item))
1882 list_del_init(&loi->loi_write_item);
1883 if (!list_empty(&loi->loi_read_item))
1884 list_del_init(&loi->loi_read_item);
1886 loi_list_maint(cli, loi);
1888 /* send_oap_rpc fails with 0 when make_ready tells it to
1889 * back off. llite's make_ready does this when it tries
1890 * to lock a page queued for write that is already locked.
1891 * we want to try sending rpcs from many objects, but we
1892 * don't want to spin failing with 0. */
1893 if (race_counter == 10)
1899 /* we're trying to queue a page in the osc so we're subject to the
1900 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1901 * If the osc's queued pages are already at that limit, then we want to sleep
1902 * until there is space in the osc's queue for us. We also may be waiting for
1903 * write credits from the OST if there are RPCs in flight that may return some
1904 * before we fall back to sync writes.
1906 * We need this know our allocation was granted in the presence of signals */
1907 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1911 client_obd_list_lock(&cli->cl_loi_list_lock);
1912 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
1913 client_obd_list_unlock(&cli->cl_loi_list_lock);
1917 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1918 * grant or cache space. */
1919 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1920 struct osc_async_page *oap)
1922 struct osc_cache_waiter ocw;
1923 struct l_wait_info lwi = { 0 };
1926 CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1927 cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1928 cli->cl_avail_grant);
1930 /* force the caller to try sync io. this can jump the list
1931 * of queued writes and create a discontiguous rpc stream */
1932 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
1933 loi->loi_ar.ar_force_sync)
1936 /* Hopefully normal case - cache space and write credits available */
1937 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
1938 cli->cl_avail_grant >= CFS_PAGE_SIZE) {
1939 /* account for ourselves */
1940 osc_consume_write_grant(cli, oap);
1944 /* Make sure that there are write rpcs in flight to wait for. This
1945 * is a little silly as this object may not have any pending but
1946 * other objects sure might. */
1947 if (cli->cl_w_in_flight) {
1948 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1949 cfs_waitq_init(&ocw.ocw_waitq);
1953 loi_list_maint(cli, loi);
1954 osc_check_rpcs(cli);
1955 client_obd_list_unlock(&cli->cl_loi_list_lock);
1957 CDEBUG(D_CACHE, "sleeping for cache space\n");
1958 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1960 client_obd_list_lock(&cli->cl_loi_list_lock);
1961 if (!list_empty(&ocw.ocw_entry)) {
1962 list_del(&ocw.ocw_entry);
1971 /* the companion to enter_cache, called when an oap is no longer part of the
1972 * dirty accounting.. so writeback completes or truncate happens before writing
1973 * starts. must be called with the loi lock held. */
1974 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1977 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
1980 if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1985 oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1986 cli->cl_dirty -= CFS_PAGE_SIZE;
1988 cli->cl_lost_grant += CFS_PAGE_SIZE;
1989 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1990 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1991 } else if (CFS_PAGE_SIZE != blocksize && oap->oap_count != CFS_PAGE_SIZE) {
1992 /* For short writes we shouldn't count parts of pages that
1993 * span a whole block on the OST side, or our accounting goes
1994 * wrong. Should match the code in filter_grant_check. */
1995 int offset = (oap->oap_obj_off +oap->oap_page_off) & ~CFS_PAGE_MASK;
1996 int count = oap->oap_count + (offset & (blocksize - 1));
1997 int end = (offset + oap->oap_count) & (blocksize - 1);
1999 count += blocksize - end;
2001 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
2002 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
2003 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
2004 cli->cl_avail_grant, cli->cl_dirty);
2010 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2011 struct lov_oinfo *loi, cfs_page_t *page,
2012 obd_off offset, struct obd_async_page_ops *ops,
2013 void *data, void **res)
2015 struct osc_async_page *oap;
2019 return size_round(sizeof(*oap));
2022 oap->oap_magic = OAP_MAGIC;
2023 oap->oap_cli = &exp->exp_obd->u.cli;
2026 oap->oap_caller_ops = ops;
2027 oap->oap_caller_data = data;
2029 oap->oap_page = page;
2030 oap->oap_obj_off = offset;
2032 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2033 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2034 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2036 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2038 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2042 struct osc_async_page *oap_from_cookie(void *cookie)
2044 struct osc_async_page *oap = cookie;
2045 if (oap->oap_magic != OAP_MAGIC)
2046 return ERR_PTR(-EINVAL);
2050 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2051 struct lov_oinfo *loi, void *cookie,
2052 int cmd, obd_off off, int count,
2053 obd_flag brw_flags, enum async_flags async_flags)
2055 struct client_obd *cli = &exp->exp_obd->u.cli;
2056 struct osc_async_page *oap;
2057 struct loi_oap_pages *lop;
2061 oap = oap_from_cookie(cookie);
2063 RETURN(PTR_ERR(oap));
2065 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2068 if (!list_empty(&oap->oap_pending_item) ||
2069 !list_empty(&oap->oap_urgent_item) ||
2070 !list_empty(&oap->oap_rpc_item))
2073 /* check if the file's owner/group is over quota */
2074 #ifdef HAVE_QUOTA_SUPPORT
2075 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2076 struct obd_async_page_ops *ops;
2083 ops = oap->oap_caller_ops;
2084 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2085 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2096 loi = &lsm->lsm_oinfo[0];
2098 client_obd_list_lock(&cli->cl_loi_list_lock);
2101 oap->oap_page_off = off;
2102 oap->oap_count = count;
2103 oap->oap_brw_flags = brw_flags;
2104 oap->oap_async_flags = async_flags;
2106 if (cmd & OBD_BRW_WRITE) {
2107 rc = osc_enter_cache(cli, loi, oap);
2109 client_obd_list_unlock(&cli->cl_loi_list_lock);
2112 lop = &loi->loi_write_lop;
2114 lop = &loi->loi_read_lop;
2117 if (oap->oap_async_flags & ASYNC_URGENT)
2118 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2119 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2120 lop_update_pending(cli, lop, cmd, 1);
2122 loi_list_maint(cli, loi);
2124 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2127 osc_check_rpcs(cli);
2128 client_obd_list_unlock(&cli->cl_loi_list_lock);
2133 /* aka (~was & now & flag), but this is more clear :) */
2134 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2136 static int osc_set_async_flags(struct obd_export *exp,
2137 struct lov_stripe_md *lsm,
2138 struct lov_oinfo *loi, void *cookie,
2139 obd_flag async_flags)
2141 struct client_obd *cli = &exp->exp_obd->u.cli;
2142 struct loi_oap_pages *lop;
2143 struct osc_async_page *oap;
2147 oap = oap_from_cookie(cookie);
2149 RETURN(PTR_ERR(oap));
2152 * bug 7311: OST-side locking is only supported for liblustre for now
2153 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2154 * implementation has to handle case where OST-locked page was picked
2155 * up by, e.g., ->writepage().
2157 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2158 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2161 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2165 loi = &lsm->lsm_oinfo[0];
2167 if (oap->oap_cmd & OBD_BRW_WRITE) {
2168 lop = &loi->loi_write_lop;
2170 lop = &loi->loi_read_lop;
2173 client_obd_list_lock(&cli->cl_loi_list_lock);
2175 if (list_empty(&oap->oap_pending_item))
2176 GOTO(out, rc = -EINVAL);
2178 if ((oap->oap_async_flags & async_flags) == async_flags)
2181 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2182 oap->oap_async_flags |= ASYNC_READY;
2184 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2185 if (list_empty(&oap->oap_rpc_item)) {
2186 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2187 loi_list_maint(cli, loi);
2191 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2192 oap->oap_async_flags);
2194 osc_check_rpcs(cli);
2195 client_obd_list_unlock(&cli->cl_loi_list_lock);
2199 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2200 struct lov_oinfo *loi,
2201 struct obd_io_group *oig, void *cookie,
2202 int cmd, obd_off off, int count,
2204 obd_flag async_flags)
2206 struct client_obd *cli = &exp->exp_obd->u.cli;
2207 struct osc_async_page *oap;
2208 struct loi_oap_pages *lop;
2211 oap = oap_from_cookie(cookie);
2213 RETURN(PTR_ERR(oap));
2215 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2218 if (!list_empty(&oap->oap_pending_item) ||
2219 !list_empty(&oap->oap_urgent_item) ||
2220 !list_empty(&oap->oap_rpc_item))
2224 loi = &lsm->lsm_oinfo[0];
2226 client_obd_list_lock(&cli->cl_loi_list_lock);
2229 oap->oap_page_off = off;
2230 oap->oap_count = count;
2231 oap->oap_brw_flags = brw_flags;
2232 oap->oap_async_flags = async_flags;
2234 if (cmd & OBD_BRW_WRITE)
2235 lop = &loi->loi_write_lop;
2237 lop = &loi->loi_read_lop;
2239 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2240 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2242 oig_add_one(oig, &oap->oap_occ);
2245 LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
2247 client_obd_list_unlock(&cli->cl_loi_list_lock);
2252 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2253 struct loi_oap_pages *lop, int cmd)
2255 struct list_head *pos, *tmp;
2256 struct osc_async_page *oap;
2258 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2259 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2260 list_del(&oap->oap_pending_item);
2261 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2262 if (oap->oap_async_flags & ASYNC_URGENT)
2263 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2264 lop_update_pending(cli, lop, cmd, 1);
2266 loi_list_maint(cli, loi);
2269 static int osc_trigger_group_io(struct obd_export *exp,
2270 struct lov_stripe_md *lsm,
2271 struct lov_oinfo *loi,
2272 struct obd_io_group *oig)
2274 struct client_obd *cli = &exp->exp_obd->u.cli;
2278 loi = &lsm->lsm_oinfo[0];
2280 client_obd_list_lock(&cli->cl_loi_list_lock);
2282 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2283 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2285 osc_check_rpcs(cli);
2286 client_obd_list_unlock(&cli->cl_loi_list_lock);
2291 static int osc_teardown_async_page(struct obd_export *exp,
2292 struct lov_stripe_md *lsm,
2293 struct lov_oinfo *loi, void *cookie)
2295 struct client_obd *cli = &exp->exp_obd->u.cli;
2296 struct loi_oap_pages *lop;
2297 struct osc_async_page *oap;
2301 oap = oap_from_cookie(cookie);
2303 RETURN(PTR_ERR(oap));
2306 loi = &lsm->lsm_oinfo[0];
2308 if (oap->oap_cmd & OBD_BRW_WRITE) {
2309 lop = &loi->loi_write_lop;
2311 lop = &loi->loi_read_lop;
2314 client_obd_list_lock(&cli->cl_loi_list_lock);
2316 if (!list_empty(&oap->oap_rpc_item))
2317 GOTO(out, rc = -EBUSY);
2319 osc_exit_cache(cli, oap, 0);
2320 osc_wake_cache_waiters(cli);
2322 if (!list_empty(&oap->oap_urgent_item)) {
2323 list_del_init(&oap->oap_urgent_item);
2324 oap->oap_async_flags &= ~ASYNC_URGENT;
2326 if (!list_empty(&oap->oap_pending_item)) {
2327 list_del_init(&oap->oap_pending_item);
2328 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2330 loi_list_maint(cli, loi);
2332 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2334 client_obd_list_unlock(&cli->cl_loi_list_lock);
2338 /* Note: caller will lock/unlock, and set uptodate on the pages */
2339 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2340 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2341 struct lov_stripe_md *lsm, obd_count page_count,
2342 struct brw_page *pga)
2344 struct ptlrpc_request *request = NULL;
2345 struct ost_body *body;
2346 struct niobuf_remote *nioptr;
2347 struct obd_ioobj *iooptr;
2348 int rc, size[3] = {sizeof(*body)}, mapped = 0;
2349 struct obd_import *imp = class_exp2cliimp(exp);
2353 /* XXX does not handle 'new' brw protocol */
2355 size[1] = sizeof(struct obd_ioobj);
2356 size[2] = page_count * sizeof(*nioptr);
2358 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
2359 OST_SAN_READ, 3, size, NULL);
2365 if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2366 request->rq_request_portal = OST_IO_PORTAL;
2368 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2369 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2370 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2371 sizeof(*nioptr) * page_count);
2373 memcpy(&body->oa, oa, sizeof(body->oa));
2375 obdo_to_ioobj(oa, iooptr);
2376 iooptr->ioo_bufcnt = page_count;
2378 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2379 LASSERT(PageLocked(pga[mapped].pg));
2380 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2382 nioptr->offset = pga[mapped].off;
2383 nioptr->len = pga[mapped].count;
2384 nioptr->flags = pga[mapped].flag;
2387 size[1] = page_count * sizeof(*nioptr);
2388 request->rq_replen = lustre_msg_size(2, size);
2390 rc = ptlrpc_queue_wait(request);
2394 body = lustre_swab_repbuf(request, 0, sizeof(*body),
2395 lustre_swab_ost_body);
2397 CERROR("Can't unpack body\n");
2398 GOTO(out_req, rc = -EPROTO);
2401 memcpy(oa, &body->oa, sizeof(*oa));
2403 swab = lustre_msg_swabbed(request->rq_repmsg);
2404 LASSERT_REPSWAB(request, 1);
2405 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2407 /* nioptr missing or short */
2408 GOTO(out_req, rc = -EPROTO);
2412 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2413 struct page *page = pga[mapped].pg;
2414 struct buffer_head *bh;
2418 lustre_swab_niobuf_remote (nioptr);
2420 /* got san device associated */
2421 LASSERT(exp->exp_obd != NULL);
2422 dev = exp->exp_obd->u.cli.cl_sandev;
2425 if (!nioptr->offset) {
2426 CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2427 page->mapping->host->i_ino,
2429 memset(page_address(page), 0, CFS_PAGE_SIZE);
2433 if (!page->buffers) {
2434 create_empty_buffers(page, dev, CFS_PAGE_SIZE);
2437 clear_bit(BH_New, &bh->b_state);
2438 set_bit(BH_Mapped, &bh->b_state);
2439 bh->b_blocknr = (unsigned long)nioptr->offset;
2441 clear_bit(BH_Uptodate, &bh->b_state);
2443 ll_rw_block(READ, 1, &bh);
2447 /* if buffer already existed, it must be the
2448 * one we mapped before, check it */
2449 LASSERT(!test_bit(BH_New, &bh->b_state));
2450 LASSERT(test_bit(BH_Mapped, &bh->b_state));
2451 LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2453 /* wait it's io completion */
2454 if (test_bit(BH_Lock, &bh->b_state))
2457 if (!test_bit(BH_Uptodate, &bh->b_state))
2458 ll_rw_block(READ, 1, &bh);
2462 /* must do syncronous write here */
2464 if (!buffer_uptodate(bh)) {
2472 ptlrpc_req_finished(request);
2476 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2477 struct lov_stripe_md *lsm, obd_count page_count,
2478 struct brw_page *pga)
2480 struct ptlrpc_request *request = NULL;
2481 struct ost_body *body;
2482 struct niobuf_remote *nioptr;
2483 struct obd_ioobj *iooptr;
2484 struct obd_import *imp = class_exp2cliimp(exp);
2485 int rc, size[3] = {sizeof(*body)}, mapped = 0;
2489 size[1] = sizeof(struct obd_ioobj);
2490 size[2] = page_count * sizeof(*nioptr);
2492 request = ptlrpc_prep_req_pool(class_exp2cliimp(exp),
2493 LUSTRE_OST_VERSION, OST_SAN_WRITE,
2494 3, size, NULL, imp->imp_rq_pool);
2500 if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2501 request->rq_request_portal = OST_IO_PORTAL;
2503 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2504 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2505 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2506 sizeof (*nioptr) * page_count);
2508 memcpy(&body->oa, oa, sizeof(body->oa));
2510 obdo_to_ioobj(oa, iooptr);
2511 iooptr->ioo_bufcnt = page_count;
2514 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2515 LASSERT(PageLocked(pga[mapped].pg));
2516 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2518 nioptr->offset = pga[mapped].off;
2519 nioptr->len = pga[mapped].count;
2520 nioptr->flags = pga[mapped].flag;
2523 size[1] = page_count * sizeof(*nioptr);
2524 request->rq_replen = lustre_msg_size(2, size);
2526 rc = ptlrpc_queue_wait(request);
2530 swab = lustre_msg_swabbed (request->rq_repmsg);
2531 LASSERT_REPSWAB (request, 1);
2532 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2534 CERROR("absent/short niobuf array\n");
2535 GOTO(out_req, rc = -EPROTO);
2539 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2540 struct page *page = pga[mapped].pg;
2541 struct buffer_head *bh;
2545 lustre_swab_niobuf_remote (nioptr);
2547 /* got san device associated */
2548 LASSERT(exp->exp_obd != NULL);
2549 dev = exp->exp_obd->u.cli.cl_sandev;
2551 if (!page->buffers) {
2552 create_empty_buffers(page, dev, CFS_PAGE_SIZE);
2555 LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2556 LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2557 LASSERT(page->buffers->b_blocknr ==
2558 (unsigned long)nioptr->offset);
2564 /* if buffer locked, wait it's io completion */
2565 if (test_bit(BH_Lock, &bh->b_state))
2568 clear_bit(BH_New, &bh->b_state);
2569 set_bit(BH_Mapped, &bh->b_state);
2571 /* override the block nr */
2572 bh->b_blocknr = (unsigned long)nioptr->offset;
2574 /* we are about to write it, so set it
2576 * page lock should garentee no race condition here */
2577 set_bit(BH_Uptodate, &bh->b_state);
2578 set_bit(BH_Dirty, &bh->b_state);
2580 ll_rw_block(WRITE, 1, &bh);
2582 /* must do syncronous write here */
2584 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2592 ptlrpc_req_finished(request);
2596 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2597 struct lov_stripe_md *lsm, obd_count page_count,
2598 struct brw_page *pga, struct obd_trans_info *oti)
2602 while (page_count) {
2603 obd_count pages_per_brw;
2606 if (page_count > PTLRPC_MAX_BRW_PAGES)
2607 pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2609 pages_per_brw = page_count;
2611 if (cmd & OBD_BRW_WRITE)
2612 rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2614 rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2619 page_count -= pages_per_brw;
2620 pga += pages_per_brw;
2626 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2629 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2632 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2635 l_lock(&lock->l_resource->lr_namespace->ns_lock);
2638 /* Liang XXX: Darwin and Winnt checking should be added */
2639 if (lock->l_ast_data && lock->l_ast_data != data) {
2640 struct inode *new_inode = data;
2641 struct inode *old_inode = lock->l_ast_data;
2642 if (!(old_inode->i_state & I_FREEING))
2643 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2644 LASSERTF(old_inode->i_state & I_FREEING,
2645 "Found existing inode %p/%lu/%u state %lu in lock: "
2646 "setting data to %p/%lu/%u\n", old_inode,
2647 old_inode->i_ino, old_inode->i_generation,
2649 new_inode, new_inode->i_ino, new_inode->i_generation);
2653 lock->l_ast_data = data;
2654 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2655 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
2656 LDLM_LOCK_PUT(lock);
2659 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2660 ldlm_iterator_t replace, void *data)
2662 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2663 struct obd_device *obd = class_exp2obd(exp);
2665 ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2669 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2670 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2671 int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2672 void *data, __u32 lvb_len, void *lvb_swabber,
2673 struct lustre_handle *lockh)
2675 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2676 struct obd_device *obd = exp->exp_obd;
2678 struct ldlm_reply *rep;
2679 struct ptlrpc_request *req = NULL;
2683 /* Filesystem lock extents are extended to page boundaries so that
2684 * dealing with the page cache is a little smoother. */
2685 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2686 policy->l_extent.end |= ~CFS_PAGE_MASK;
2688 if (lsm->lsm_oinfo->loi_kms_valid == 0)
2691 /* Next, search for already existing extent locks that will cover us */
2692 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, policy,
2695 osc_set_data_with_check(lockh, data, *flags);
2696 if (*flags & LDLM_FL_HAS_INTENT) {
2697 /* I would like to be able to ASSERT here that rss <=
2698 * kms, but I can't, for reasons which are explained in
2701 /* We already have a lock, and it's referenced */
2705 /* If we're trying to read, we also search for an existing PW lock. The
2706 * VFS and page cache already protect us locally, so lots of readers/
2707 * writers can share a single PW lock.
2709 * There are problems with conversion deadlocks, so instead of
2710 * converting a read lock to a write lock, we'll just enqueue a new
2713 * At some point we should cancel the read lock instead of making them
2714 * send us a blocking callback, but there are problems with canceling
2715 * locks out from other users right now, too. */
2717 if (mode == LCK_PR) {
2718 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2719 policy, LCK_PW, lockh);
2721 /* FIXME: This is not incredibly elegant, but it might
2722 * be more elegant than adding another parameter to
2723 * lock_match. I want a second opinion. */
2724 ldlm_lock_addref(lockh, LCK_PR);
2725 ldlm_lock_decref(lockh, LCK_PW);
2726 osc_set_data_with_check(lockh, data, *flags);
2732 if (*flags & LDLM_FL_HAS_INTENT) {
2733 int size[2] = {sizeof(struct ldlm_request), sizeof(lvb)};
2735 req = ptlrpc_prep_req(class_exp2cliimp(exp),
2736 LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 1,
2741 size[0] = sizeof(*rep);
2742 req->rq_replen = lustre_msg_size(2, size);
2745 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2746 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2748 rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
2749 policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2750 &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2753 if (rc == ELDLM_LOCK_ABORTED) {
2754 /* swabbed by ldlm_cli_enqueue() */
2755 LASSERT_REPSWABBED(req, 0);
2756 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
2757 LASSERT(rep != NULL);
2758 if (rep->lock_policy_res1)
2759 rc = rep->lock_policy_res1;
2761 ptlrpc_req_finished(req);
2764 if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2765 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2766 lvb.lvb_size, lvb.lvb_blocks, lvb.lvb_mtime);
2767 lsm->lsm_oinfo->loi_lvb = lvb;
2773 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2774 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2775 int *flags, void *data, struct lustre_handle *lockh)
2777 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2778 struct obd_device *obd = exp->exp_obd;
2782 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2784 /* Filesystem lock extents are extended to page boundaries so that
2785 * dealing with the page cache is a little smoother */
2786 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2787 policy->l_extent.end |= ~CFS_PAGE_MASK;
2789 /* Next, search for already existing extent locks that will cover us */
2790 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2791 policy, mode, lockh);
2793 //if (!(*flags & LDLM_FL_TEST_LOCK))
2794 osc_set_data_with_check(lockh, data, *flags);
2797 /* If we're trying to read, we also search for an existing PW lock. The
2798 * VFS and page cache already protect us locally, so lots of readers/
2799 * writers can share a single PW lock. */
2800 if (mode == LCK_PR) {
2801 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2802 policy, LCK_PW, lockh);
2803 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2804 /* FIXME: This is not incredibly elegant, but it might
2805 * be more elegant than adding another parameter to
2806 * lock_match. I want a second opinion. */
2807 osc_set_data_with_check(lockh, data, *flags);
2808 ldlm_lock_addref(lockh, LCK_PR);
2809 ldlm_lock_decref(lockh, LCK_PW);
2815 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2816 __u32 mode, struct lustre_handle *lockh)
2820 if (unlikely(mode == LCK_GROUP))
2821 ldlm_lock_decref_and_cancel(lockh, mode);
2823 ldlm_lock_decref(lockh, mode);
2828 static int osc_cancel_unused(struct obd_export *exp,
2829 struct lov_stripe_md *lsm,
2830 int flags, void *opaque)
2832 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2833 struct obd_device *obd = class_exp2obd(exp);
2835 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id,
2839 static int osc_join_lru(struct obd_export *exp,
2840 struct lov_stripe_md *lsm, int join)
2842 struct obd_device *obd = class_exp2obd(exp);
2843 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2845 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2848 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2851 struct obd_statfs *msfs;
2852 struct ptlrpc_request *request;
2853 int rc, size = sizeof(*osfs);
2856 /* We could possibly pass max_age in the request (as an absolute
2857 * timestamp or a "seconds.usec ago") so the target can avoid doing
2858 * extra calls into the filesystem if that isn't necessary (e.g.
2859 * during mount that would help a bit). Having relative timestamps
2860 * is not so great if request processing is slow, while absolute
2861 * timestamps are not ideal because they need time synchronization. */
2862 request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2863 OST_STATFS,0,NULL,NULL);
2867 request->rq_replen = lustre_msg_size(1, &size);
2868 request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2870 rc = ptlrpc_queue_wait(request);
2874 msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2875 lustre_swab_obd_statfs);
2877 CERROR("Can't unpack obd_statfs\n");
2878 GOTO(out, rc = -EPROTO);
2881 memcpy(osfs, msfs, sizeof(*osfs));
2885 ptlrpc_req_finished(request);
2889 /* Retrieve object striping information.
2891 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2892 * the maximum number of OST indices which will fit in the user buffer.
2893 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2895 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2897 struct lov_user_md lum, *lumk;
2898 int rc = 0, lum_size;
2904 if (copy_from_user(&lum, lump, sizeof(lum)))
2907 if (lum.lmm_magic != LOV_USER_MAGIC)
2910 if (lum.lmm_stripe_count > 0) {
2911 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2912 OBD_ALLOC(lumk, lum_size);
2916 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2918 lum_size = sizeof(lum);
2922 lumk->lmm_object_id = lsm->lsm_object_id;
2923 lumk->lmm_stripe_count = 1;
2925 if (copy_to_user(lump, lumk, lum_size))
2929 OBD_FREE(lumk, lum_size);
2935 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2936 void *karg, void *uarg)
2938 struct obd_device *obd = exp->exp_obd;
2939 struct obd_ioctl_data *data = karg;
2943 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2946 if (!try_module_get(THIS_MODULE)) {
2947 CERROR("Can't get module. Is it alive?");
2952 case OBD_IOC_LOV_GET_CONFIG: {
2954 struct lov_desc *desc;
2955 struct obd_uuid uuid;
2959 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2960 GOTO(out, err = -EINVAL);
2962 data = (struct obd_ioctl_data *)buf;
2964 if (sizeof(*desc) > data->ioc_inllen1) {
2965 obd_ioctl_freedata(buf, len);
2966 GOTO(out, err = -EINVAL);
2969 if (data->ioc_inllen2 < sizeof(uuid)) {
2970 obd_ioctl_freedata(buf, len);
2971 GOTO(out, err = -EINVAL);
2974 desc = (struct lov_desc *)data->ioc_inlbuf1;
2975 desc->ld_tgt_count = 1;
2976 desc->ld_active_tgt_count = 1;
2977 desc->ld_default_stripe_count = 1;
2978 desc->ld_default_stripe_size = 0;
2979 desc->ld_default_stripe_offset = 0;
2980 desc->ld_pattern = 0;
2981 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2983 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2985 err = copy_to_user((void *)uarg, buf, len);
2988 obd_ioctl_freedata(buf, len);
2991 case LL_IOC_LOV_SETSTRIPE:
2992 err = obd_alloc_memmd(exp, karg);
2996 case LL_IOC_LOV_GETSTRIPE:
2997 err = osc_getstripe(karg, uarg);
2999 case OBD_IOC_CLIENT_RECOVER:
3000 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3005 case IOC_OSC_SET_ACTIVE:
3006 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3009 case OBD_IOC_POLL_QUOTACHECK:
3010 err = lquota_poll_check(quota_interface, exp,
3011 (struct if_quotacheck *)karg);
3014 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3015 cmd, cfs_curproc_comm());
3016 GOTO(out, err = -ENOTTY);
3019 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3022 module_put(THIS_MODULE);
3027 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3028 void *key, __u32 *vallen, void *val)
3031 if (!vallen || !val)
3034 if (keylen > strlen("lock_to_stripe") &&
3035 strcmp(key, "lock_to_stripe") == 0) {
3036 __u32 *stripe = val;
3037 *vallen = sizeof(*stripe);
3040 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3041 struct ptlrpc_request *req;
3043 char *bufs[1] = {key};
3045 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3046 OST_GET_INFO, 1, &keylen, bufs);
3050 req->rq_replen = lustre_msg_size(1, vallen);
3051 rc = ptlrpc_queue_wait(req);
3055 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
3056 lustre_swab_ost_last_id);
3057 if (reply == NULL) {
3058 CERROR("Can't unpack OST last ID\n");
3059 GOTO(out, rc = -EPROTO);
3061 *((obd_id *)val) = *reply;
3063 ptlrpc_req_finished(req);
3069 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3072 struct llog_ctxt *ctxt;
3073 struct obd_import *imp = req->rq_import;
3079 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3082 rc = llog_initiator_connect(ctxt);
3084 CERROR("cannot establish connection for "
3085 "ctxt %p: %d\n", ctxt, rc);
3088 imp->imp_server_timeout = 1;
3089 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3090 imp->imp_pingable = 1;
3095 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3096 void *key, obd_count vallen, void *val,
3097 struct ptlrpc_request_set *set)
3099 struct ptlrpc_request *req;
3100 struct obd_device *obd = exp->exp_obd;
3101 struct obd_import *imp = class_exp2cliimp(exp);
3102 int size[2] = {keylen, vallen};
3103 char *bufs[2] = {key, val};
3106 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3108 if (KEY_IS(KEY_NEXT_ID)) {
3109 if (vallen != sizeof(obd_id))
3111 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3112 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3113 exp->exp_obd->obd_name,
3114 obd->u.cli.cl_oscc.oscc_next_id);
3119 if (KEY_IS("unlinked")) {
3120 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3121 spin_lock(&oscc->oscc_lock);
3122 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3123 spin_unlock(&oscc->oscc_lock);
3127 if (KEY_IS(KEY_INIT_RECOV)) {
3128 if (vallen != sizeof(int))
3130 imp->imp_initial_recov = *(int *)val;
3131 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3132 exp->exp_obd->obd_name,
3133 imp->imp_initial_recov);
3137 if (KEY_IS("checksum")) {
3138 if (vallen != sizeof(int))
3140 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3147 /* We pass all other commands directly to OST. Since nobody calls osc
3148 methods directly and everybody is supposed to go through LOV, we
3149 assume lov checked invalid values for us.
3150 The only recognised values so far are evict_by_nid and mds_conn.
3151 Even if something bad goes through, we'd get a -EINVAL from OST
3154 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO,
3159 req->rq_replen = lustre_msg_size(0, NULL);
3161 if (KEY_IS("mds_conn"))
3162 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3163 ptlrpc_set_add_req(set, req);
3164 ptlrpc_check_set(set);
3170 static struct llog_operations osc_size_repl_logops = {
3171 lop_cancel: llog_obd_repl_cancel
3174 static struct llog_operations osc_mds_ost_orig_logops;
3175 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3176 int count, struct llog_catid *catid)
3181 osc_mds_ost_orig_logops = llog_lvfs_ops;
3182 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3183 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3184 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3185 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3187 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3188 &catid->lci_logid, &osc_mds_ost_orig_logops);
3192 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3193 &osc_size_repl_logops);
3197 static int osc_llog_finish(struct obd_device *obd, int count)
3199 struct llog_ctxt *ctxt;
3200 int rc = 0, rc2 = 0;
3203 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3205 rc = llog_cleanup(ctxt);
3207 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3209 rc2 = llog_cleanup(ctxt);
3216 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3217 struct obd_uuid *cluuid,
3218 struct obd_connect_data *data)
3220 struct client_obd *cli = &obd->u.cli;
3222 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3225 client_obd_list_lock(&cli->cl_loi_list_lock);
3226 data->ocd_grant = cli->cl_avail_grant ?:
3227 2 * cli->cl_max_pages_per_rpc << PAGE_SHIFT;
3228 lost_grant = cli->cl_lost_grant;
3229 cli->cl_lost_grant = 0;
3230 client_obd_list_unlock(&cli->cl_loi_list_lock);
3232 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3233 "cl_lost_grant: %ld\n", data->ocd_grant,
3234 cli->cl_avail_grant, lost_grant);
3235 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3236 " ocd_grant: %d\n", data->ocd_connect_flags,
3237 data->ocd_version, data->ocd_grant);
3243 static int osc_disconnect(struct obd_export *exp)
3245 struct obd_device *obd = class_exp2obd(exp);
3246 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3249 if (obd->u.cli.cl_conn_count == 1)
3250 /* flush any remaining cancel messages out to the target */
3251 llog_sync(ctxt, exp);
3253 rc = client_disconnect_export(exp);
3257 static int osc_import_event(struct obd_device *obd,
3258 struct obd_import *imp,
3259 enum obd_import_event event)
3261 struct client_obd *cli;
3265 LASSERT(imp->imp_obd == obd);
3268 case IMP_EVENT_DISCON: {
3269 /* Only do this on the MDS OSC's */
3270 if (imp->imp_server_timeout) {
3271 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3273 spin_lock(&oscc->oscc_lock);
3274 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3275 spin_unlock(&oscc->oscc_lock);
3280 case IMP_EVENT_INACTIVE: {
3281 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3284 case IMP_EVENT_INVALIDATE: {
3285 struct ldlm_namespace *ns = obd->obd_namespace;
3289 client_obd_list_lock(&cli->cl_loi_list_lock);
3290 cli->cl_avail_grant = 0;
3291 cli->cl_lost_grant = 0;
3292 /* all pages go to failing rpcs due to the invalid import */
3293 osc_check_rpcs(cli);
3294 client_obd_list_unlock(&cli->cl_loi_list_lock);
3296 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3300 case IMP_EVENT_ACTIVE: {
3301 /* Only do this on the MDS OSC's */
3302 if (imp->imp_server_timeout) {
3303 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3305 spin_lock(&oscc->oscc_lock);
3306 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3307 spin_unlock(&oscc->oscc_lock);
3309 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3312 case IMP_EVENT_OCD: {
3313 struct obd_connect_data *ocd = &imp->imp_connect_data;
3315 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3316 osc_init_grant(&obd->u.cli, ocd);
3319 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3320 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3322 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3326 CERROR("Unknown import event %d\n", event);
3332 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3338 rc = ptlrpcd_addref();
3342 rc = client_obd_setup(obd, lcfg);
3346 struct lprocfs_static_vars lvars;
3347 struct client_obd *cli = &obd->u.cli;
3349 lprocfs_init_vars(osc, &lvars);
3350 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3351 lproc_osc_attach_seqstat(obd);
3352 ptlrpc_lprocfs_register_obd(obd);
3356 /* We need to allocate a few requests more, because
3357 brw_interpret_oap tries to create new requests before freeing
3358 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3359 reserved, but I afraid that might be too much wasted RAM
3360 in fact, so 2 is just my guess and still should work. */
3361 cli->cl_import->imp_rq_pool =
3362 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3364 ptlrpc_add_rqs_to_pool);
3370 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3376 case OBD_CLEANUP_EARLY: {
3377 struct obd_import *imp;
3378 imp = obd->u.cli.cl_import;
3379 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3380 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3381 ptlrpc_deactivate_import(imp);
3384 case OBD_CLEANUP_EXPORTS:
3386 case OBD_CLEANUP_SELF_EXP:
3387 rc = obd_llog_finish(obd, 0);
3389 CERROR("failed to cleanup llogging subsystems\n");
3391 case OBD_CLEANUP_OBD:
3397 int osc_cleanup(struct obd_device *obd)
3399 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3403 ptlrpc_lprocfs_unregister_obd(obd);
3404 lprocfs_obd_cleanup(obd);
3406 spin_lock(&oscc->oscc_lock);
3407 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3408 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3409 spin_unlock(&oscc->oscc_lock);
3411 /* free memory of osc quota cache */
3412 lquota_cleanup(quota_interface, obd);
3414 rc = client_obd_cleanup(obd);
3421 struct obd_ops osc_obd_ops = {
3422 .o_owner = THIS_MODULE,
3423 .o_setup = osc_setup,
3424 .o_precleanup = osc_precleanup,
3425 .o_cleanup = osc_cleanup,
3426 .o_add_conn = client_import_add_conn,
3427 .o_del_conn = client_import_del_conn,
3428 .o_connect = client_connect_import,
3429 .o_reconnect = osc_reconnect,
3430 .o_disconnect = osc_disconnect,
3431 .o_statfs = osc_statfs,
3432 .o_packmd = osc_packmd,
3433 .o_unpackmd = osc_unpackmd,
3434 .o_create = osc_create,
3435 .o_destroy = osc_destroy,
3436 .o_getattr = osc_getattr,
3437 .o_getattr_async = osc_getattr_async,
3438 .o_setattr = osc_setattr,
3439 .o_setattr_async = osc_setattr_async,
3441 .o_brw_async = osc_brw_async,
3442 .o_prep_async_page = osc_prep_async_page,
3443 .o_queue_async_io = osc_queue_async_io,
3444 .o_set_async_flags = osc_set_async_flags,
3445 .o_queue_group_io = osc_queue_group_io,
3446 .o_trigger_group_io = osc_trigger_group_io,
3447 .o_teardown_async_page = osc_teardown_async_page,
3448 .o_punch = osc_punch,
3450 .o_enqueue = osc_enqueue,
3451 .o_match = osc_match,
3452 .o_change_cbdata = osc_change_cbdata,
3453 .o_cancel = osc_cancel,
3454 .o_cancel_unused = osc_cancel_unused,
3455 .o_join_lru = osc_join_lru,
3456 .o_iocontrol = osc_iocontrol,
3457 .o_get_info = osc_get_info,
3458 .o_set_info_async = osc_set_info_async,
3459 .o_import_event = osc_import_event,
3460 .o_llog_init = osc_llog_init,
3461 .o_llog_finish = osc_llog_finish,
3464 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3465 struct obd_ops sanosc_obd_ops = {
3466 .o_owner = THIS_MODULE,
3467 .o_setup = client_sanobd_setup,
3468 .o_precleanup = osc_precleanup,
3469 .o_cleanup = osc_cleanup,
3470 .o_add_conn = client_import_add_conn,
3471 .o_del_conn = client_import_del_conn,
3472 .o_connect = client_connect_import,
3473 .o_reconnect = osc_reconnect,
3474 .o_disconnect = client_disconnect_export,
3475 .o_statfs = osc_statfs,
3476 .o_packmd = osc_packmd,
3477 .o_unpackmd = osc_unpackmd,
3478 .o_create = osc_real_create,
3479 .o_destroy = osc_destroy,
3480 .o_getattr = osc_getattr,
3481 .o_getattr_async = osc_getattr_async,
3482 .o_setattr = osc_setattr,
3483 .o_brw = sanosc_brw,
3484 .o_punch = osc_punch,
3486 .o_enqueue = osc_enqueue,
3487 .o_match = osc_match,
3488 .o_change_cbdata = osc_change_cbdata,
3489 .o_cancel = osc_cancel,
3490 .o_cancel_unused = osc_cancel_unused,
3491 .o_join_lru = osc_join_lru,
3492 .o_iocontrol = osc_iocontrol,
3493 .o_import_event = osc_import_event,
3494 .o_llog_init = osc_llog_init,
3495 .o_llog_finish = osc_llog_finish,
3499 extern quota_interface_t osc_quota_interface;
3501 int __init osc_init(void)
3503 struct lprocfs_static_vars lvars;
3504 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3505 struct lprocfs_static_vars sanlvars;
3510 lprocfs_init_vars(osc, &lvars);
3511 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3512 lprocfs_init_vars(osc, &sanlvars);
3515 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3516 lquota_init(quota_interface);
3517 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3519 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3520 LUSTRE_OSC_NAME, NULL);
3522 if (quota_interface)
3523 PORTAL_SYMBOL_PUT(osc_quota_interface);
3527 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3528 rc = class_register_type(&sanosc_obd_ops, NULL, sanlvars.module_vars,
3529 LUSTRE_SANOSC_NAME, NULL);
3531 class_unregister_type(LUSTRE_OSC_NAME);
3532 if (quota_interface)
3533 PORTAL_SYMBOL_PUT(osc_quota_interface);
3542 static void /*__exit*/ osc_exit(void)
3544 lquota_exit(quota_interface);
3545 if (quota_interface)
3546 PORTAL_SYMBOL_PUT(osc_quota_interface);
3548 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3549 class_unregister_type(LUSTRE_SANOSC_NAME);
3551 class_unregister_type(LUSTRE_OSC_NAME);
3554 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3555 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3556 MODULE_LICENSE("GPL");
3558 cfs_module(osc, "1.0.0", osc_init, osc_exit);