1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <linux/version.h>
39 # include <linux/module.h>
40 # include <linux/mm.h>
41 # include <linux/highmem.h>
42 # include <linux/ctype.h>
43 # include <linux/init.h>
44 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
45 # include <linux/workqueue.h>
46 # include <linux/smp_lock.h>
48 # include <linux/locks.h>
50 #else /* __KERNEL__ */
51 # include <liblustre.h>
54 # include <linux/lustre_dlm.h>
55 #include <libcfs/kp30.h>
56 #include <linux/lustre_net.h>
57 #include <lustre/lustre_user.h>
58 #include <linux/obd_ost.h>
59 #include <linux/obd_lov.h>
65 #include <linux/lustre_ha.h>
66 #include <linux/lprocfs_status.h>
67 #include <linux/lustre_log.h>
68 #include <linux/lustre_debug.h>
69 #include "osc_internal.h"
71 /* Pack OSC object metadata for disk storage (LE byte order). */
72 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
73 struct lov_stripe_md *lsm)
78 lmm_size = sizeof(**lmmp);
83 OBD_FREE(*lmmp, lmm_size);
89 OBD_ALLOC(*lmmp, lmm_size);
95 LASSERT(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
110 if (lmm_bytes < sizeof (*lmm)) {
111 CERROR("lov_mds_md too small: %d, need %d\n",
112 lmm_bytes, (int)sizeof(*lmm));
115 /* XXX LOV_MAGIC etc check? */
117 if (lmm->lmm_object_id == 0) {
118 CERROR("lov_mds_md: zero lmm_object_id\n");
123 lsm_size = lov_stripe_md_size(1);
127 if (*lsmp != NULL && lmm == NULL) {
128 OBD_FREE(*lsmp, lsm_size);
134 OBD_ALLOC(*lsmp, lsm_size);
137 loi_init((*lsmp)->lsm_oinfo);
141 /* XXX zero *lsmp? */
142 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
143 LASSERT((*lsmp)->lsm_object_id);
146 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
151 static int osc_getattr_interpret(struct ptlrpc_request *req,
152 struct osc_getattr_async_args *aa, int rc)
154 struct ost_body *body;
160 body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
162 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
163 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
165 /* This should really be sent by the OST */
166 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
167 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
169 CERROR("can't unpack ost_body\n");
171 aa->aa_oa->o_valid = 0;
177 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
178 struct lov_stripe_md *md,
179 struct ptlrpc_request_set *set)
181 struct ptlrpc_request *request;
182 struct ost_body *body;
183 int size = sizeof(*body);
184 struct osc_getattr_async_args *aa;
187 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
188 OST_GETATTR, 1, &size, NULL);
192 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
193 memcpy(&body->oa, oa, sizeof(*oa));
195 request->rq_replen = lustre_msg_size(1, &size);
196 request->rq_interpret_reply = osc_getattr_interpret;
198 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
199 aa = (struct osc_getattr_async_args *)&request->rq_async_args;
202 ptlrpc_set_add_req (set, request);
206 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
207 struct lov_stripe_md *md)
209 struct ptlrpc_request *request;
210 struct ost_body *body;
211 int rc, size = sizeof(*body);
214 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
215 OST_GETATTR, 1, &size, NULL);
219 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
220 memcpy(&body->oa, oa, sizeof(*oa));
222 request->rq_replen = lustre_msg_size(1, &size);
224 rc = ptlrpc_queue_wait(request);
226 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
230 body = lustre_swab_repbuf(request, 0, sizeof (*body),
231 lustre_swab_ost_body);
233 CERROR ("can't unpack ost_body\n");
234 GOTO (out, rc = -EPROTO);
237 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
238 memcpy(oa, &body->oa, sizeof(*oa));
240 /* This should really be sent by the OST */
241 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
242 oa->o_valid |= OBD_MD_FLBLKSZ;
246 ptlrpc_req_finished(request);
250 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
251 struct lov_stripe_md *md, struct obd_trans_info *oti)
253 struct ptlrpc_request *request;
254 struct ost_body *body;
255 int rc, size = sizeof(*body);
258 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
259 OST_SETATTR, 1, &size, NULL);
263 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
264 memcpy(&body->oa, oa, sizeof(*oa));
266 request->rq_replen = lustre_msg_size(1, &size);
268 rc = ptlrpc_queue_wait(request);
272 body = lustre_swab_repbuf(request, 0, sizeof(*body),
273 lustre_swab_ost_body);
275 GOTO(out, rc = -EPROTO);
277 memcpy(oa, &body->oa, sizeof(*oa));
281 ptlrpc_req_finished(request);
285 static int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
286 struct lov_stripe_md *md,
287 struct obd_trans_info *oti)
289 struct ptlrpc_request *request;
290 struct ost_body *body;
291 int rc = 0, size = sizeof(*body);
296 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
297 OST_SETATTR, 1, &size, NULL);
301 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
303 if (oa->o_valid & OBD_MD_FLCOOKIE)
304 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
305 sizeof(*oti->oti_logcookies));
307 memcpy(&body->oa, oa, sizeof(*oa));
308 request->rq_replen = lustre_msg_size(1, &size);
309 /* do mds to ost setattr asynchronouly */
310 ptlrpcd_add_req(request);
315 int osc_real_create(struct obd_export *exp, struct obdo *oa,
316 struct lov_stripe_md **ea, struct obd_trans_info *oti)
318 struct ptlrpc_request *request;
319 struct ost_body *body;
320 struct lov_stripe_md *lsm;
321 int rc, size = sizeof(*body);
329 rc = obd_alloc_memmd(exp, &lsm);
334 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
335 OST_CREATE, 1, &size, NULL);
337 GOTO(out, rc = -ENOMEM);
339 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
340 memcpy(&body->oa, oa, sizeof(body->oa));
342 request->rq_replen = lustre_msg_size(1, &size);
343 if (oa->o_valid & OBD_MD_FLINLINE) {
344 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
345 oa->o_flags == OBD_FL_DELORPHAN);
346 DEBUG_REQ(D_HA, request,
347 "delorphan from OST integration");
348 /* Don't resend the delorphan request */
349 request->rq_no_resend = request->rq_no_delay = 1;
352 rc = ptlrpc_queue_wait(request);
356 body = lustre_swab_repbuf(request, 0, sizeof(*body),
357 lustre_swab_ost_body);
359 CERROR ("can't unpack ost_body\n");
360 GOTO (out_req, rc = -EPROTO);
363 memcpy(oa, &body->oa, sizeof(*oa));
365 /* This should really be sent by the OST */
366 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
367 oa->o_valid |= OBD_MD_FLBLKSZ;
369 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
370 * have valid lsm_oinfo data structs, so don't go touching that.
371 * This needs to be fixed in a big way.
373 lsm->lsm_object_id = oa->o_id;
377 oti->oti_transno = request->rq_repmsg->transno;
379 if (oa->o_valid & OBD_MD_FLCOOKIE) {
380 if (!oti->oti_logcookies)
381 oti_alloc_cookies(oti, 1);
382 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
383 sizeof(oti->oti_onecookie));
387 CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
390 ptlrpc_req_finished(request);
393 obd_free_memmd(exp, &lsm);
397 static int osc_punch(struct obd_export *exp, struct obdo *oa,
398 struct lov_stripe_md *md, obd_size start,
399 obd_size end, struct obd_trans_info *oti)
401 struct ptlrpc_request *request;
402 struct ost_body *body;
403 int rc, size = sizeof(*body);
411 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
412 OST_PUNCH, 1, &size, NULL);
416 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
417 memcpy(&body->oa, oa, sizeof(*oa));
419 /* overload the size and blocks fields in the oa with start/end */
420 body->oa.o_size = start;
421 body->oa.o_blocks = end;
422 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
424 request->rq_replen = lustre_msg_size(1, &size);
426 rc = ptlrpc_queue_wait(request);
430 body = lustre_swab_repbuf (request, 0, sizeof (*body),
431 lustre_swab_ost_body);
433 CERROR ("can't unpack ost_body\n");
434 GOTO (out, rc = -EPROTO);
437 memcpy(oa, &body->oa, sizeof(*oa));
441 ptlrpc_req_finished(request);
445 static int osc_sync(struct obd_export *exp, struct obdo *oa,
446 struct lov_stripe_md *md, obd_size start, obd_size end)
448 struct ptlrpc_request *request;
449 struct ost_body *body;
450 int rc, size = sizeof(*body);
458 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
459 OST_SYNC, 1, &size, NULL);
463 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
464 memcpy(&body->oa, oa, sizeof(*oa));
466 /* overload the size and blocks fields in the oa with start/end */
467 body->oa.o_size = start;
468 body->oa.o_blocks = end;
469 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
471 request->rq_replen = lustre_msg_size(1, &size);
473 rc = ptlrpc_queue_wait(request);
477 body = lustre_swab_repbuf(request, 0, sizeof(*body),
478 lustre_swab_ost_body);
480 CERROR ("can't unpack ost_body\n");
481 GOTO (out, rc = -EPROTO);
484 memcpy(oa, &body->oa, sizeof(*oa));
488 ptlrpc_req_finished(request);
492 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
493 struct lov_stripe_md *ea, struct obd_trans_info *oti,
494 struct obd_export *md_export)
496 struct ptlrpc_request *request;
497 struct ost_body *body;
498 int rc, size = sizeof(*body);
506 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
507 OST_DESTROY, 1, &size, NULL);
511 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
513 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
514 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
515 sizeof(*oti->oti_logcookies));
516 oti->oti_logcookies++;
519 memcpy(&body->oa, oa, sizeof(*oa));
520 request->rq_replen = lustre_msg_size(1, &size);
522 rc = ptlrpc_queue_wait(request);
528 body = lustre_swab_repbuf(request, 0, sizeof(*body),
529 lustre_swab_ost_body);
531 CERROR ("Can't unpack body\n");
532 GOTO (out, rc = -EPROTO);
535 memcpy(oa, &body->oa, sizeof(*oa));
539 ptlrpc_req_finished(request);
543 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
546 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
548 LASSERT(!(oa->o_valid & bits));
551 spin_lock(&cli->cl_loi_list_lock);
552 oa->o_dirty = cli->cl_dirty;
553 if (cli->cl_dirty > cli->cl_dirty_max) {
554 CERROR("dirty %lu > dirty_max %lu\n",
555 cli->cl_dirty, cli->cl_dirty_max);
557 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
558 CERROR("dirty %lu - dirty_max %lu too big???\n",
559 cli->cl_dirty, cli->cl_dirty_max);
562 long max_in_flight = (cli->cl_max_pages_per_rpc << PAGE_SHIFT)*
563 (cli->cl_max_rpcs_in_flight + 1);
564 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
566 oa->o_grant = cli->cl_avail_grant;
567 oa->o_dropped = cli->cl_lost_grant;
568 cli->cl_lost_grant = 0;
569 spin_unlock(&cli->cl_loi_list_lock);
570 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
571 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
574 /* caller must hold loi_list_lock */
575 static void osc_consume_write_grant(struct client_obd *cli,
576 struct osc_async_page *oap)
578 cli->cl_dirty += PAGE_SIZE;
579 cli->cl_avail_grant -= PAGE_SIZE;
580 oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
581 CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
582 LASSERT(cli->cl_avail_grant >= 0);
585 static unsigned long rpcs_in_flight(struct client_obd *cli)
587 return cli->cl_r_in_flight + cli->cl_w_in_flight;
590 /* caller must hold loi_list_lock */
591 void osc_wake_cache_waiters(struct client_obd *cli)
593 struct list_head *l, *tmp;
594 struct osc_cache_waiter *ocw;
596 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
597 /* if we can't dirty more, we must wait until some is written */
598 if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
599 CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
600 cli->cl_dirty, cli->cl_dirty_max);
604 /* if still dirty cache but no grant wait for pending RPCs that
605 * may yet return us some grant before doing sync writes */
606 if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
607 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
608 cli->cl_w_in_flight);
612 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
613 list_del_init(&ocw->ocw_entry);
614 if (cli->cl_avail_grant < PAGE_SIZE) {
615 /* no more RPCs in flight to return grant, do sync IO */
616 ocw->ocw_rc = -EDQUOT;
617 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
619 osc_consume_write_grant(cli, ocw->ocw_oap);
622 wake_up(&ocw->ocw_waitq);
628 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
630 spin_lock(&cli->cl_loi_list_lock);
631 cli->cl_avail_grant = ocd->ocd_grant;
632 spin_unlock(&cli->cl_loi_list_lock);
634 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
635 cli->cl_avail_grant, cli->cl_lost_grant);
636 LASSERT(cli->cl_avail_grant >= 0);
639 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
641 spin_lock(&cli->cl_loi_list_lock);
642 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
643 cli->cl_avail_grant += body->oa.o_grant;
644 /* waiters are woken in brw_interpret_oap */
645 spin_unlock(&cli->cl_loi_list_lock);
648 /* We assume that the reason this OSC got a short read is because it read
649 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
650 * via the LOV, and it _knows_ it's reading inside the file, it's just that
651 * this stripe never got written at or beyond this stripe offset yet. */
652 static void handle_short_read(int nob_read, obd_count page_count,
653 struct brw_page *pga)
657 /* skip bytes read OK */
658 while (nob_read > 0) {
659 LASSERT (page_count > 0);
661 if (pga->count > nob_read) {
662 /* EOF inside this page */
663 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
664 memset(ptr + nob_read, 0, pga->count - nob_read);
671 nob_read -= pga->count;
676 /* zero remaining pages */
677 while (page_count-- > 0) {
678 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
679 memset(ptr, 0, pga->count);
685 static int check_write_rcs(struct ptlrpc_request *request,
686 int requested_nob, int niocount,
687 obd_count page_count, struct brw_page *pga)
691 /* return error if any niobuf was in error */
692 remote_rcs = lustre_swab_repbuf(request, 1,
693 sizeof(*remote_rcs) * niocount, NULL);
694 if (remote_rcs == NULL) {
695 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
698 if (lustre_msg_swabbed(request->rq_repmsg))
699 for (i = 0; i < niocount; i++)
700 __swab32s(&remote_rcs[i]);
702 for (i = 0; i < niocount; i++) {
703 if (remote_rcs[i] < 0)
704 return(remote_rcs[i]);
706 if (remote_rcs[i] != 0) {
707 CERROR("rc[%d] invalid (%d) req %p\n",
708 i, remote_rcs[i], request);
713 if (request->rq_bulk->bd_nob_transferred != requested_nob) {
714 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
715 requested_nob, request->rq_bulk->bd_nob_transferred);
722 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
724 if (p1->flag != p2->flag) {
725 unsigned mask = ~OBD_BRW_FROM_GRANT;
727 /* warn if we try to combine flags that we don't know to be
729 if ((p1->flag & mask) != (p2->flag & mask))
730 CERROR("is it ok to have flags 0x%x and 0x%x in the "
731 "same brw?\n", p1->flag, p2->flag);
735 return (p1->off + p1->count == p2->off);
738 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
739 struct brw_page *pga)
743 LASSERT (pg_count > 0);
744 while (nob > 0 && pg_count > 0) {
745 char *ptr = kmap(pga->pg);
746 int off = pga->off & ~PAGE_MASK;
747 int count = pga->count > nob ? nob : pga->count;
749 cksum = crc32_le(cksum, ptr + off, count);
751 LL_CDEBUG_PAGE(D_PAGE, pga->pg, "off %d checksum %x\n",
762 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
763 struct lov_stripe_md *lsm, obd_count page_count,
764 struct brw_page *pga, int *requested_nobp,
765 int *niocountp, struct ptlrpc_request **reqp)
767 struct ptlrpc_request *req;
768 struct ptlrpc_bulk_desc *desc;
769 struct client_obd *cli = &imp->imp_obd->u.cli;
770 struct ost_body *body;
771 struct obd_ioobj *ioobj;
772 struct niobuf_remote *niobuf;
779 struct ptlrpc_request_pool *pool;
781 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
782 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_rq_pool : NULL;
784 for (niocount = i = 1; i < page_count; i++)
785 if (!can_merge_pages(&pga[i - 1], &pga[i]))
788 size[0] = sizeof(*body);
789 size[1] = sizeof(*ioobj);
790 size[2] = niocount * sizeof(*niobuf);
792 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
793 req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 3,
798 /* FIXME bug 249. Also see bug 7198 */
799 if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
800 req->rq_request_portal = OST_IO_PORTAL;
802 if (opc == OST_WRITE)
803 desc = ptlrpc_prep_bulk_imp (req, page_count,
804 BULK_GET_SOURCE, OST_BULK_PORTAL);
806 desc = ptlrpc_prep_bulk_imp (req, page_count,
807 BULK_PUT_SINK, OST_BULK_PORTAL);
809 GOTO(out, rc = -ENOMEM);
810 /* NB request now owns desc and will free it when it gets freed */
812 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
813 ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
814 niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
816 memcpy(&body->oa, oa, sizeof(*oa));
818 obdo_to_ioobj(oa, ioobj);
819 ioobj->ioo_bufcnt = niocount;
821 LASSERT (page_count > 0);
822 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
823 struct brw_page *pg = &pga[i];
824 struct brw_page *pg_prev = pg - 1;
826 LASSERT(pg->count > 0);
827 LASSERTF((pg->off & ~PAGE_MASK) + pg->count <= PAGE_SIZE,
828 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
830 LASSERTF(i == 0 || pg->off > pg_prev->off,
831 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
832 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
834 pg->pg, pg->pg->private, pg->pg->index, pg->off,
835 pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
837 LASSERT((pga[0].flag & OBD_BRW_SRVLOCK) ==
838 (pg->flag & OBD_BRW_SRVLOCK));
840 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~PAGE_MASK,
842 requested_nob += pg->count;
844 if (i > 0 && can_merge_pages(pg_prev, pg)) {
846 niobuf->len += pg->count;
848 niobuf->offset = pg->off;
849 niobuf->len = pg->count;
850 niobuf->flags = pg->flag;
854 LASSERT((void *)(niobuf - niocount) ==
855 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
856 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
858 /* size[0] still sizeof (*body) */
859 if (opc == OST_WRITE) {
860 if (unlikely(cli->cl_checksum)) {
861 body->oa.o_valid |= OBD_MD_FLCKSUM;
862 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
864 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
866 /* save this in 'oa', too, for later checking */
867 oa->o_valid |= OBD_MD_FLCKSUM;
868 oa->o_cksum = body->oa.o_cksum;
870 /* 1 RC per niobuf */
871 size[1] = sizeof(__u32) * niocount;
872 req->rq_replen = lustre_msg_size(2, size);
874 if (unlikely(cli->cl_checksum))
875 body->oa.o_valid |= OBD_MD_FLCKSUM;
876 /* 1 RC for the whole I/O */
877 req->rq_replen = lustre_msg_size(1, size);
880 *niocountp = niocount;
881 *requested_nobp = requested_nob;
886 ptlrpc_req_finished (req);
890 static void check_write_csum(__u32 cli, __u32 srv, int requested_nob,
891 obd_count page_count, struct brw_page *pga)
896 CDEBUG(D_PAGE, "checksum %x confirmed\n", cli);
900 new_csum = osc_checksum_bulk(requested_nob, page_count, pga);
902 if (new_csum == srv) {
903 CERROR("BAD CHECKSUM (WRITE): pages were mutated on the client"
904 "after we checksummed them (original client csum:"
905 " %x; server csum: %x; client csum now: %x)\n",
910 if (new_csum == cli) {
911 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit "
912 "(original client csum: %x; server csum: %x; client "
913 "csum now: %x)\n", cli, srv, new_csum);
917 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit, and the "
918 "current page contents don't match the originals OR what the "
919 "server received (original client csum: %x; server csum: %x; "
920 "client csum now: %x)\n", cli, srv, new_csum);
923 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
924 int requested_nob, int niocount,
925 obd_count page_count, struct brw_page *pga,
928 const lnet_process_id_t *peer =
929 &req->rq_import->imp_connection->c_peer;
930 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
931 struct ost_body *body;
932 __u32 client_cksum = 0;
935 if (rc < 0 && rc != -EDQUOT)
938 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
939 body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
941 CERROR ("Can't unpack body\n");
945 /* set/clear over quota flag for a uid/gid */
946 if (req->rq_reqmsg->opc == OST_WRITE &&
947 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
948 lquota_setdq(quota_interface, cli, body->oa.o_uid,
949 body->oa.o_gid, body->oa.o_valid,
955 if (unlikely(oa->o_valid & OBD_MD_FLCKSUM))
956 client_cksum = oa->o_cksum; /* save for later */
958 osc_update_grant(cli, body);
959 memcpy(oa, &body->oa, sizeof(*oa));
961 if (req->rq_reqmsg->opc == OST_WRITE) {
963 CERROR ("Unexpected +ve rc %d\n", rc);
966 LASSERT (req->rq_bulk->bd_nob == requested_nob);
968 if (unlikely((oa->o_valid & OBD_MD_FLCKSUM) &&
970 check_write_csum(client_cksum, oa->o_cksum,
971 requested_nob, page_count, pga);
974 RETURN(check_write_rcs(req, requested_nob, niocount,
978 /* The rest of this function executes only for OST_READs */
979 if (rc > requested_nob) {
980 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
984 if (rc != req->rq_bulk->bd_nob_transferred) {
985 CERROR ("Unexpected rc %d (%d transferred)\n",
986 rc, req->rq_bulk->bd_nob_transferred);
990 if (rc < requested_nob)
991 handle_short_read(rc, page_count, pga);
993 if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) {
994 static int cksum_counter;
995 __u32 cksum = osc_checksum_bulk(rc, page_count, pga);
996 __u32 server_cksum = oa->o_cksum;
998 if (server_cksum == ~0 && rc > 0) {
999 CERROR("Protocol error: server %s set the 'checksum' "
1000 "bit, but didn't send a checksum. Not fatal, "
1001 "but please tell CFS.\n",
1002 libcfs_nid2str(peer->nid));
1008 if (server_cksum != cksum) {
1009 CERROR("Bad checksum from %s: server %x != client %x\n",
1010 libcfs_nid2str(peer->nid), server_cksum, cksum);
1012 oa->o_cksum = cksum;
1013 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1014 CWARN("Checksum %u from %s OK: %x\n",
1015 cksum_counter, libcfs_nid2str(peer->nid), cksum);
1017 CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum);
1018 } else if (unlikely(client_cksum)) {
1019 static int cksum_missed;
1022 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1023 CERROR("Checksum %u requested from %s but not sent\n",
1024 cksum_missed, libcfs_nid2str(peer->nid));
1030 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1031 struct lov_stripe_md *lsm,
1032 obd_count page_count, struct brw_page *pga)
1036 struct ptlrpc_request *request;
1041 rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1042 page_count, pga, &requested_nob, &niocount,
1047 rc = ptlrpc_queue_wait(request);
1049 if (rc == -ETIMEDOUT && request->rq_resend) {
1050 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1051 ptlrpc_req_finished(request);
1055 rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1056 page_count, pga, rc);
1058 ptlrpc_req_finished(request);
1062 static int brw_interpret(struct ptlrpc_request *request,
1063 struct osc_brw_async_args *aa, int rc)
1065 struct obdo *oa = aa->aa_oa;
1066 int requested_nob = aa->aa_requested_nob;
1067 int niocount = aa->aa_nio_count;
1068 obd_count page_count = aa->aa_page_count;
1069 struct brw_page *pga = aa->aa_pga;
1072 rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1073 page_count, pga, rc);
1077 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1078 struct lov_stripe_md *lsm, obd_count page_count,
1079 struct brw_page *pga, struct ptlrpc_request_set *set)
1081 struct ptlrpc_request *request;
1084 struct osc_brw_async_args *aa;
1088 rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1089 page_count, pga, &requested_nob, &nio_count,
1093 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1094 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1096 aa->aa_requested_nob = requested_nob;
1097 aa->aa_nio_count = nio_count;
1098 aa->aa_page_count = page_count;
1101 request->rq_interpret_reply = brw_interpret;
1102 ptlrpc_set_add_req(set, request);
1108 #define min_t(type,x,y) \
1109 ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
1113 * ugh, we want disk allocation on the target to happen in offset order. we'll
1114 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1115 * fine for our small page arrays and doesn't require allocation. its an
1116 * insertion sort that swaps elements that are strides apart, shrinking the
1117 * stride down until its '1' and the array is sorted.
1119 static void sort_brw_pages(struct brw_page *array, int num)
1122 struct brw_page tmp;
1126 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1131 for (i = stride ; i < num ; i++) {
1134 while (j >= stride && array[j - stride].off > tmp.off) {
1135 array[j] = array[j - stride];
1140 } while (stride > 1);
1143 static obd_count max_unfragmented_pages(struct brw_page *pg, obd_count pages)
1148 LASSERT (pages > 0);
1149 offset = pg->off & (PAGE_SIZE - 1);
1153 if (pages == 0) /* that's all */
1156 if (offset + pg->count < PAGE_SIZE)
1157 return count; /* doesn't end on page boundary */
1160 offset = pg->off & (PAGE_SIZE - 1);
1161 if (offset != 0) /* doesn't start on page boundary */
1168 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1169 struct lov_stripe_md *md, obd_count page_count,
1170 struct brw_page *pga, struct obd_trans_info *oti)
1174 if (cmd & OBD_BRW_CHECK) {
1175 /* The caller just wants to know if there's a chance that this
1176 * I/O can succeed */
1177 struct obd_import *imp = class_exp2cliimp(exp);
1179 if (imp == NULL || imp->imp_invalid)
1184 while (page_count) {
1185 obd_count pages_per_brw;
1188 if (page_count > PTLRPC_MAX_BRW_PAGES)
1189 pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1191 pages_per_brw = page_count;
1193 sort_brw_pages(pga, pages_per_brw);
1194 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1196 rc = osc_brw_internal(cmd, exp, oa, md, pages_per_brw, pga);
1201 page_count -= pages_per_brw;
1202 pga += pages_per_brw;
1207 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1208 struct lov_stripe_md *md, obd_count page_count,
1209 struct brw_page *pga, struct ptlrpc_request_set *set,
1210 struct obd_trans_info *oti)
1214 if (cmd & OBD_BRW_CHECK) {
1215 /* The caller just wants to know if there's a chance that this
1216 * I/O can succeed */
1217 struct obd_import *imp = class_exp2cliimp(exp);
1219 if (imp == NULL || imp->imp_invalid)
1224 while (page_count) {
1225 obd_count pages_per_brw;
1228 if (page_count > PTLRPC_MAX_BRW_PAGES)
1229 pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1231 pages_per_brw = page_count;
1233 sort_brw_pages(pga, pages_per_brw);
1234 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1236 rc = async_internal(cmd, exp, oa, md, pages_per_brw, pga, set);
1241 page_count -= pages_per_brw;
1242 pga += pages_per_brw;
1247 static void osc_check_rpcs(struct client_obd *cli);
1248 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1251 /* This maintains the lists of pending pages to read/write for a given object
1252 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1253 * to quickly find objects that are ready to send an RPC. */
1254 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1260 if (lop->lop_num_pending == 0)
1263 /* if we have an invalid import we want to drain the queued pages
1264 * by forcing them through rpcs that immediately fail and complete
1265 * the pages. recovery relies on this to empty the queued pages
1266 * before canceling the locks and evicting down the llite pages */
1267 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1270 /* stream rpcs in queue order as long as as there is an urgent page
1271 * queued. this is our cheap solution for good batching in the case
1272 * where writepage marks some random page in the middle of the file
1273 * as urgent because of, say, memory pressure */
1274 if (!list_empty(&lop->lop_urgent))
1277 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1278 optimal = cli->cl_max_pages_per_rpc;
1279 if (cmd & OBD_BRW_WRITE) {
1280 /* trigger a write rpc stream as long as there are dirtiers
1281 * waiting for space. as they're waiting, they're not going to
1282 * create more pages to coallesce with what's waiting.. */
1283 if (!list_empty(&cli->cl_cache_waiters))
1286 /* +16 to avoid triggering rpcs that would want to include pages
1287 * that are being queued but which can't be made ready until
1288 * the queuer finishes with the page. this is a wart for
1289 * llite::commit_write() */
1292 if (lop->lop_num_pending >= optimal)
1298 static void on_list(struct list_head *item, struct list_head *list,
1301 if (list_empty(item) && should_be_on)
1302 list_add_tail(item, list);
1303 else if (!list_empty(item) && !should_be_on)
1304 list_del_init(item);
1307 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1308 * can find pages to build into rpcs quickly */
1309 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1311 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1312 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1313 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1315 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1316 loi->loi_write_lop.lop_num_pending);
1318 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1319 loi->loi_read_lop.lop_num_pending);
1322 static void lop_update_pending(struct client_obd *cli,
1323 struct loi_oap_pages *lop, int cmd, int delta)
1325 lop->lop_num_pending += delta;
1326 if (cmd & OBD_BRW_WRITE)
1327 cli->cl_pending_w_pages += delta;
1329 cli->cl_pending_r_pages += delta;
1332 /* this is called when a sync waiter receives an interruption. Its job is to
1333 * get the caller woken as soon as possible. If its page hasn't been put in an
1334 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1335 * desiring interruption which will forcefully complete the rpc once the rpc
1337 static void osc_occ_interrupted(struct oig_callback_context *occ)
1339 struct osc_async_page *oap;
1340 struct loi_oap_pages *lop;
1341 struct lov_oinfo *loi;
1344 /* XXX member_of() */
1345 oap = list_entry(occ, struct osc_async_page, oap_occ);
1347 spin_lock(&oap->oap_cli->cl_loi_list_lock);
1349 oap->oap_interrupted = 1;
1351 /* ok, it's been put in an rpc. */
1352 if (oap->oap_request != NULL) {
1353 ptlrpc_mark_interrupted(oap->oap_request);
1354 ptlrpcd_wake(oap->oap_request);
1358 /* we don't get interruption callbacks until osc_trigger_sync_io()
1359 * has been called and put the sync oaps in the pending/urgent lists.*/
1360 if (!list_empty(&oap->oap_pending_item)) {
1361 list_del_init(&oap->oap_pending_item);
1362 if (oap->oap_async_flags & ASYNC_URGENT)
1363 list_del_init(&oap->oap_urgent_item);
1366 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1367 &loi->loi_write_lop : &loi->loi_read_lop;
1368 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1369 loi_list_maint(oap->oap_cli, oap->oap_loi);
1371 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1372 oap->oap_oig = NULL;
1376 spin_unlock(&oap->oap_cli->cl_loi_list_lock);
1379 /* this is trying to propogate async writeback errors back up to the
1380 * application. As an async write fails we record the error code for later if
1381 * the app does an fsync. As long as errors persist we force future rpcs to be
1382 * sync so that the app can get a sync error and break the cycle of queueing
1383 * pages for which writeback will fail. */
1384 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1391 ar->ar_force_sync = 1;
1392 ar->ar_min_xid = ptlrpc_sample_next_xid();
1397 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1398 ar->ar_force_sync = 0;
1401 /* this must be called holding the loi list lock to give coverage to exit_cache,
1402 * async_flag maintenance, and oap_request */
1403 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1404 struct osc_async_page *oap, int sent, int rc)
1406 osc_exit_cache(cli, oap, sent);
1407 oap->oap_async_flags = 0;
1408 oap->oap_interrupted = 0;
1410 if (oap->oap_cmd & OBD_BRW_WRITE) {
1411 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1412 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1415 if (oap->oap_request != NULL) {
1416 ptlrpc_req_finished(oap->oap_request);
1417 oap->oap_request = NULL;
1420 if (rc == 0 && oa != NULL) {
1421 if (oa->o_valid & OBD_MD_FLBLOCKS)
1422 oap->oap_loi->loi_blocks = oa->o_blocks;
1423 if (oa->o_valid & OBD_MD_FLMTIME)
1424 oap->oap_loi->loi_mtime = oa->o_mtime;
1428 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1429 oap->oap_oig = NULL;
1434 oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1438 static int brw_interpret_oap(struct ptlrpc_request *request,
1439 struct osc_brw_async_args *aa, int rc)
1441 struct osc_async_page *oap;
1442 struct client_obd *cli;
1443 struct list_head *pos, *n;
1446 rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1447 aa->aa_nio_count, aa->aa_page_count,
1450 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1454 spin_lock(&cli->cl_loi_list_lock);
1456 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1457 * is called so we know whether to go to sync BRWs or wait for more
1458 * RPCs to complete */
1459 if (request->rq_reqmsg->opc == OST_WRITE)
1460 cli->cl_w_in_flight--;
1462 cli->cl_r_in_flight--;
1464 /* the caller may re-use the oap after the completion call so
1465 * we need to clean it up a little */
1466 list_for_each_safe(pos, n, &aa->aa_oaps) {
1467 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1469 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1470 //oap->oap_page, oap->oap_page->index, oap);
1472 list_del_init(&oap->oap_rpc_item);
1473 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1476 osc_wake_cache_waiters(cli);
1477 osc_check_rpcs(cli);
1479 spin_unlock(&cli->cl_loi_list_lock);
1481 obdo_free(aa->aa_oa);
1482 OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1487 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1488 struct list_head *rpc_list,
1489 int page_count, int cmd)
1491 struct ptlrpc_request *req;
1492 struct brw_page *pga = NULL;
1493 int requested_nob, nio_count;
1494 struct osc_brw_async_args *aa;
1495 struct obdo *oa = NULL;
1496 struct obd_async_page_ops *ops = NULL;
1497 void *caller_data = NULL;
1498 struct list_head *pos;
1501 LASSERT(!list_empty(rpc_list));
1503 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1505 RETURN(ERR_PTR(-ENOMEM));
1509 GOTO(out, req = ERR_PTR(-ENOMEM));
1512 list_for_each(pos, rpc_list) {
1513 struct osc_async_page *oap;
1515 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1517 ops = oap->oap_caller_ops;
1518 caller_data = oap->oap_caller_data;
1520 pga[i].off = oap->oap_obj_off + oap->oap_page_off;
1521 pga[i].pg = oap->oap_page;
1522 pga[i].count = oap->oap_count;
1523 pga[i].flag = oap->oap_brw_flags;
1524 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1525 pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
1529 /* always get the data for the obdo for the rpc */
1530 LASSERT(ops != NULL);
1531 ops->ap_fill_obdo(caller_data, cmd, oa);
1533 sort_brw_pages(pga, page_count);
1534 rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1535 pga, &requested_nob, &nio_count, &req);
1537 CERROR("prep_req failed: %d\n", rc);
1538 GOTO(out, req = ERR_PTR(rc));
1541 LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1542 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1544 aa->aa_requested_nob = requested_nob;
1545 aa->aa_nio_count = nio_count;
1546 aa->aa_page_count = page_count;
1555 OBD_FREE(pga, sizeof(*pga) * page_count);
1560 /* the loi lock is held across this function but it's allowed to release
1561 * and reacquire it during its work */
1562 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1563 int cmd, struct loi_oap_pages *lop)
1565 struct ptlrpc_request *request;
1566 obd_count page_count = 0;
1567 struct list_head *tmp, *pos;
1568 struct osc_async_page *oap = NULL;
1569 struct osc_brw_async_args *aa;
1570 struct obd_async_page_ops *ops;
1571 LIST_HEAD(rpc_list);
1572 unsigned int ending_offset;
1573 unsigned starting_offset = 0;
1576 /* first we find the pages we're allowed to work with */
1577 list_for_each_safe(pos, tmp, &lop->lop_pending) {
1578 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1579 ops = oap->oap_caller_ops;
1581 LASSERT(oap->oap_magic == OAP_MAGIC);
1583 /* in llite being 'ready' equates to the page being locked
1584 * until completion unlocks it. commit_write submits a page
1585 * as not ready because its unlock will happen unconditionally
1586 * as the call returns. if we race with commit_write giving
1587 * us that page we dont' want to create a hole in the page
1588 * stream, so we stop and leave the rpc to be fired by
1589 * another dirtier or kupdated interval (the not ready page
1590 * will still be on the dirty list). we could call in
1591 * at the end of ll_file_write to process the queue again. */
1592 if (!(oap->oap_async_flags & ASYNC_READY)) {
1593 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1595 CDEBUG(D_INODE, "oap %p page %p returned %d "
1596 "instead of ready\n", oap,
1600 /* llite is telling us that the page is still
1601 * in commit_write and that we should try
1602 * and put it in an rpc again later. we
1603 * break out of the loop so we don't create
1604 * a hole in the sequence of pages in the rpc
1609 /* the io isn't needed.. tell the checks
1610 * below to complete the rpc with EINTR */
1611 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1612 oap->oap_count = -EINTR;
1615 oap->oap_async_flags |= ASYNC_READY;
1618 LASSERTF(0, "oap %p page %p returned %d "
1619 "from make_ready\n", oap,
1627 * Page submitted for IO has to be locked. Either by
1628 * ->ap_make_ready() or by higher layers.
1630 * XXX nikita: this assertion should be adjusted when lustre
1631 * starts using PG_writeback for pages being written out.
1633 #if defined(__KERNEL__)
1634 LASSERT(PageLocked(oap->oap_page));
1636 /* If there is a gap at the start of this page, it can't merge
1637 * with any previous page, so we'll hand the network a
1638 * "fragmented" page array that it can't transfer in 1 RDMA */
1639 if (page_count != 0 && oap->oap_page_off != 0)
1642 /* take the page out of our book-keeping */
1643 list_del_init(&oap->oap_pending_item);
1644 lop_update_pending(cli, lop, cmd, -1);
1645 list_del_init(&oap->oap_urgent_item);
1647 if (page_count == 0)
1648 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1649 (PTLRPC_MAX_BRW_SIZE - 1);
1651 /* ask the caller for the size of the io as the rpc leaves. */
1652 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1654 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1655 if (oap->oap_count <= 0) {
1656 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1658 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1662 /* now put the page back in our accounting */
1663 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1664 if (++page_count >= cli->cl_max_pages_per_rpc)
1667 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
1668 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
1669 * have the same alignment as the initial writes that allocated
1670 * extents on the server. */
1671 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
1672 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
1673 if (ending_offset == 0)
1676 /* If there is a gap at the end of this page, it can't merge
1677 * with any subsequent pages, so we'll hand the network a
1678 * "fragmented" page array that it can't transfer in 1 RDMA */
1679 if (oap->oap_page_off + oap->oap_count < PAGE_SIZE)
1683 osc_wake_cache_waiters(cli);
1685 if (page_count == 0)
1688 loi_list_maint(cli, loi);
1690 spin_unlock(&cli->cl_loi_list_lock);
1692 request = osc_build_req(cli, &rpc_list, page_count, cmd);
1693 if (IS_ERR(request)) {
1694 /* this should happen rarely and is pretty bad, it makes the
1695 * pending list not follow the dirty order */
1696 spin_lock(&cli->cl_loi_list_lock);
1697 list_for_each_safe(pos, tmp, &rpc_list) {
1698 oap = list_entry(pos, struct osc_async_page,
1700 list_del_init(&oap->oap_rpc_item);
1702 /* queued sync pages can be torn down while the pages
1703 * were between the pending list and the rpc */
1704 if (oap->oap_interrupted) {
1705 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1706 osc_ap_completion(cli, NULL, oap, 0,
1710 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(request));
1712 /* put the page back in the loi/lop lists */
1713 list_add_tail(&oap->oap_pending_item,
1715 lop_update_pending(cli, lop, cmd, 1);
1716 if (oap->oap_async_flags & ASYNC_URGENT)
1717 list_add(&oap->oap_urgent_item,
1720 loi_list_maint(cli, loi);
1721 RETURN(PTR_ERR(request));
1724 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1725 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1726 INIT_LIST_HEAD(&aa->aa_oaps);
1727 list_splice(&rpc_list, &aa->aa_oaps);
1728 INIT_LIST_HEAD(&rpc_list);
1730 if (cmd == OBD_BRW_READ) {
1731 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1732 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1733 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1734 starting_offset/PAGE_SIZE + 1);
1736 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1737 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1738 cli->cl_w_in_flight);
1739 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1740 starting_offset/PAGE_SIZE + 1);
1743 spin_lock(&cli->cl_loi_list_lock);
1745 if (cmd == OBD_BRW_READ)
1746 cli->cl_r_in_flight++;
1748 cli->cl_w_in_flight++;
1750 /* queued sync pages can be torn down while the pages
1751 * were between the pending list and the rpc */
1752 list_for_each(pos, &aa->aa_oaps) {
1753 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1754 if (oap->oap_interrupted) {
1755 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1757 ptlrpc_mark_interrupted(request);
1762 CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %dr/%dw in flight\n",
1763 request, page_count, aa, cli->cl_r_in_flight,
1764 cli->cl_w_in_flight);
1766 oap->oap_request = ptlrpc_request_addref(request);
1767 request->rq_interpret_reply = brw_interpret_oap;
1768 ptlrpcd_add_req(request);
1772 #define LOI_DEBUG(LOI, STR, args...) \
1773 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
1774 !list_empty(&(LOI)->loi_cli_item), \
1775 (LOI)->loi_write_lop.lop_num_pending, \
1776 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
1777 (LOI)->loi_read_lop.lop_num_pending, \
1778 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
1781 /* This is called by osc_check_rpcs() to find which objects have pages that
1782 * we could be sending. These lists are maintained by lop_makes_rpc(). */
1783 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1786 /* first return all objects which we already know to have
1787 * pages ready to be stuffed into rpcs */
1788 if (!list_empty(&cli->cl_loi_ready_list))
1789 RETURN(list_entry(cli->cl_loi_ready_list.next,
1790 struct lov_oinfo, loi_cli_item));
1792 /* then if we have cache waiters, return all objects with queued
1793 * writes. This is especially important when many small files
1794 * have filled up the cache and not been fired into rpcs because
1795 * they don't pass the nr_pending/object threshhold */
1796 if (!list_empty(&cli->cl_cache_waiters) &&
1797 !list_empty(&cli->cl_loi_write_list))
1798 RETURN(list_entry(cli->cl_loi_write_list.next,
1799 struct lov_oinfo, loi_write_item));
1801 /* then return all queued objects when we have an invalid import
1802 * so that they get flushed */
1803 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1804 if (!list_empty(&cli->cl_loi_write_list))
1805 RETURN(list_entry(cli->cl_loi_write_list.next,
1806 struct lov_oinfo, loi_write_item));
1807 if (!list_empty(&cli->cl_loi_read_list))
1808 RETURN(list_entry(cli->cl_loi_read_list.next,
1809 struct lov_oinfo, loi_read_item));
1814 /* called with the loi list lock held */
1815 static void osc_check_rpcs(struct client_obd *cli)
1817 struct lov_oinfo *loi;
1818 int rc = 0, race_counter = 0;
1821 while ((loi = osc_next_loi(cli)) != NULL) {
1822 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
1824 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
1827 /* attempt some read/write balancing by alternating between
1828 * reads and writes in an object. The makes_rpc checks here
1829 * would be redundant if we were getting read/write work items
1830 * instead of objects. we don't want send_oap_rpc to drain a
1831 * partial read pending queue when we're given this object to
1832 * do io on writes while there are cache waiters */
1833 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1834 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1835 &loi->loi_write_lop);
1843 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1844 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1845 &loi->loi_read_lop);
1854 /* attempt some inter-object balancing by issueing rpcs
1855 * for each object in turn */
1856 if (!list_empty(&loi->loi_cli_item))
1857 list_del_init(&loi->loi_cli_item);
1858 if (!list_empty(&loi->loi_write_item))
1859 list_del_init(&loi->loi_write_item);
1860 if (!list_empty(&loi->loi_read_item))
1861 list_del_init(&loi->loi_read_item);
1863 loi_list_maint(cli, loi);
1865 /* send_oap_rpc fails with 0 when make_ready tells it to
1866 * back off. llite's make_ready does this when it tries
1867 * to lock a page queued for write that is already locked.
1868 * we want to try sending rpcs from many objects, but we
1869 * don't want to spin failing with 0. */
1870 if (race_counter == 10)
1876 /* we're trying to queue a page in the osc so we're subject to the
1877 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1878 * If the osc's queued pages are already at that limit, then we want to sleep
1879 * until there is space in the osc's queue for us. We also may be waiting for
1880 * write credits from the OST if there are RPCs in flight that may return some
1881 * before we fall back to sync writes.
1883 * We need this know our allocation was granted in the presence of signals */
1884 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1888 spin_lock(&cli->cl_loi_list_lock);
1889 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
1890 spin_unlock(&cli->cl_loi_list_lock);
1894 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1895 * grant or cache space. */
1896 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1897 struct osc_async_page *oap)
1899 struct osc_cache_waiter ocw;
1900 struct l_wait_info lwi = { 0 };
1902 CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1903 cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1904 cli->cl_avail_grant);
1906 /* force the caller to try sync io. this can jump the list
1907 * of queued writes and create a discontiguous rpc stream */
1908 if (cli->cl_dirty_max < PAGE_SIZE || cli->cl_ar.ar_force_sync ||
1909 loi->loi_ar.ar_force_sync)
1912 /* Hopefully normal case - cache space and write credits available */
1913 if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1914 cli->cl_avail_grant >= PAGE_SIZE) {
1915 /* account for ourselves */
1916 osc_consume_write_grant(cli, oap);
1920 /* Make sure that there are write rpcs in flight to wait for. This
1921 * is a little silly as this object may not have any pending but
1922 * other objects sure might. */
1923 if (cli->cl_w_in_flight) {
1924 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1925 init_waitqueue_head(&ocw.ocw_waitq);
1929 loi_list_maint(cli, loi);
1930 osc_check_rpcs(cli);
1931 spin_unlock(&cli->cl_loi_list_lock);
1933 CDEBUG(D_CACHE, "sleeping for cache space\n");
1934 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1936 spin_lock(&cli->cl_loi_list_lock);
1937 if (!list_empty(&ocw.ocw_entry)) {
1938 list_del(&ocw.ocw_entry);
1947 /* the companion to enter_cache, called when an oap is no longer part of the
1948 * dirty accounting.. so writeback completes or truncate happens before writing
1949 * starts. must be called with the loi lock held. */
1950 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1953 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
1956 if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1961 oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1962 cli->cl_dirty -= PAGE_SIZE;
1964 cli->cl_lost_grant += PAGE_SIZE;
1965 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1966 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1967 } else if (PAGE_SIZE != blocksize && oap->oap_count != PAGE_SIZE) {
1968 /* For short writes we shouldn't count parts of pages that
1969 * span a whole block on the OST side, or our accounting goes
1970 * wrong. Should match the code in filter_grant_check. */
1971 int offset = (oap->oap_obj_off +oap->oap_page_off) & ~PAGE_MASK;
1972 int count = oap->oap_count + (offset & (blocksize - 1));
1973 int end = (offset + oap->oap_count) & (blocksize - 1);
1975 count += blocksize - end;
1977 cli->cl_lost_grant += PAGE_SIZE - count;
1978 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
1979 PAGE_SIZE - count, cli->cl_lost_grant,
1980 cli->cl_avail_grant, cli->cl_dirty);
1986 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1987 struct lov_oinfo *loi, struct page *page,
1988 obd_off offset, struct obd_async_page_ops *ops,
1989 void *data, void **res)
1991 struct osc_async_page *oap;
1995 return size_round(sizeof(*oap));
1998 oap->oap_magic = OAP_MAGIC;
1999 oap->oap_cli = &exp->exp_obd->u.cli;
2002 oap->oap_caller_ops = ops;
2003 oap->oap_caller_data = data;
2005 oap->oap_page = page;
2006 oap->oap_obj_off = offset;
2008 INIT_LIST_HEAD(&oap->oap_pending_item);
2009 INIT_LIST_HEAD(&oap->oap_urgent_item);
2010 INIT_LIST_HEAD(&oap->oap_rpc_item);
2012 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2014 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2018 struct osc_async_page *oap_from_cookie(void *cookie)
2020 struct osc_async_page *oap = cookie;
2021 if (oap->oap_magic != OAP_MAGIC)
2022 return ERR_PTR(-EINVAL);
2026 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2027 struct lov_oinfo *loi, void *cookie,
2028 int cmd, obd_off off, int count,
2029 obd_flag brw_flags, enum async_flags async_flags)
2031 struct client_obd *cli = &exp->exp_obd->u.cli;
2032 struct osc_async_page *oap;
2033 struct loi_oap_pages *lop;
2037 oap = oap_from_cookie(cookie);
2039 RETURN(PTR_ERR(oap));
2041 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2044 if (!list_empty(&oap->oap_pending_item) ||
2045 !list_empty(&oap->oap_urgent_item) ||
2046 !list_empty(&oap->oap_rpc_item))
2049 /* check if the file's owner/group is over quota */
2050 #ifdef HAVE_QUOTA_SUPPORT
2051 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2052 struct obd_async_page_ops *ops;
2059 ops = oap->oap_caller_ops;
2060 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2061 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2072 loi = &lsm->lsm_oinfo[0];
2074 spin_lock(&cli->cl_loi_list_lock);
2077 oap->oap_page_off = off;
2078 oap->oap_count = count;
2079 oap->oap_brw_flags = brw_flags;
2080 oap->oap_async_flags = async_flags;
2082 if (cmd & OBD_BRW_WRITE) {
2083 rc = osc_enter_cache(cli, loi, oap);
2085 spin_unlock(&cli->cl_loi_list_lock);
2088 lop = &loi->loi_write_lop;
2090 lop = &loi->loi_read_lop;
2093 if (oap->oap_async_flags & ASYNC_URGENT)
2094 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2095 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2096 lop_update_pending(cli, lop, cmd, 1);
2098 loi_list_maint(cli, loi);
2100 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2103 osc_check_rpcs(cli);
2104 spin_unlock(&cli->cl_loi_list_lock);
2109 /* aka (~was & now & flag), but this is more clear :) */
2110 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2112 static int osc_set_async_flags(struct obd_export *exp,
2113 struct lov_stripe_md *lsm,
2114 struct lov_oinfo *loi, void *cookie,
2115 obd_flag async_flags)
2117 struct client_obd *cli = &exp->exp_obd->u.cli;
2118 struct loi_oap_pages *lop;
2119 struct osc_async_page *oap;
2123 oap = oap_from_cookie(cookie);
2125 RETURN(PTR_ERR(oap));
2128 * bug 7311: OST-side locking is only supported for liblustre for now
2129 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2130 * implementation has to handle case where OST-locked page was picked
2131 * up by, e.g., ->writepage().
2133 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2134 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2137 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2141 loi = &lsm->lsm_oinfo[0];
2143 if (oap->oap_cmd & OBD_BRW_WRITE) {
2144 lop = &loi->loi_write_lop;
2146 lop = &loi->loi_read_lop;
2149 spin_lock(&cli->cl_loi_list_lock);
2151 if (list_empty(&oap->oap_pending_item))
2152 GOTO(out, rc = -EINVAL);
2154 if ((oap->oap_async_flags & async_flags) == async_flags)
2157 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2158 oap->oap_async_flags |= ASYNC_READY;
2160 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2161 if (list_empty(&oap->oap_rpc_item)) {
2162 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2163 loi_list_maint(cli, loi);
2167 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2168 oap->oap_async_flags);
2170 osc_check_rpcs(cli);
2171 spin_unlock(&cli->cl_loi_list_lock);
2175 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2176 struct lov_oinfo *loi,
2177 struct obd_io_group *oig, void *cookie,
2178 int cmd, obd_off off, int count,
2180 obd_flag async_flags)
2182 struct client_obd *cli = &exp->exp_obd->u.cli;
2183 struct osc_async_page *oap;
2184 struct loi_oap_pages *lop;
2187 oap = oap_from_cookie(cookie);
2189 RETURN(PTR_ERR(oap));
2191 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2194 if (!list_empty(&oap->oap_pending_item) ||
2195 !list_empty(&oap->oap_urgent_item) ||
2196 !list_empty(&oap->oap_rpc_item))
2200 loi = &lsm->lsm_oinfo[0];
2202 spin_lock(&cli->cl_loi_list_lock);
2205 oap->oap_page_off = off;
2206 oap->oap_count = count;
2207 oap->oap_brw_flags = brw_flags;
2208 oap->oap_async_flags = async_flags;
2210 if (cmd & OBD_BRW_WRITE)
2211 lop = &loi->loi_write_lop;
2213 lop = &loi->loi_read_lop;
2215 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2216 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2218 oig_add_one(oig, &oap->oap_occ);
2221 LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
2223 spin_unlock(&cli->cl_loi_list_lock);
2228 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2229 struct loi_oap_pages *lop, int cmd)
2231 struct list_head *pos, *tmp;
2232 struct osc_async_page *oap;
2234 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2235 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2236 list_del(&oap->oap_pending_item);
2237 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2238 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2239 lop_update_pending(cli, lop, cmd, 1);
2241 loi_list_maint(cli, loi);
2244 static int osc_trigger_group_io(struct obd_export *exp,
2245 struct lov_stripe_md *lsm,
2246 struct lov_oinfo *loi,
2247 struct obd_io_group *oig)
2249 struct client_obd *cli = &exp->exp_obd->u.cli;
2253 loi = &lsm->lsm_oinfo[0];
2255 spin_lock(&cli->cl_loi_list_lock);
2257 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2258 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2260 osc_check_rpcs(cli);
2261 spin_unlock(&cli->cl_loi_list_lock);
2266 static int osc_teardown_async_page(struct obd_export *exp,
2267 struct lov_stripe_md *lsm,
2268 struct lov_oinfo *loi, void *cookie)
2270 struct client_obd *cli = &exp->exp_obd->u.cli;
2271 struct loi_oap_pages *lop;
2272 struct osc_async_page *oap;
2276 oap = oap_from_cookie(cookie);
2278 RETURN(PTR_ERR(oap));
2281 loi = &lsm->lsm_oinfo[0];
2283 if (oap->oap_cmd & OBD_BRW_WRITE) {
2284 lop = &loi->loi_write_lop;
2286 lop = &loi->loi_read_lop;
2289 spin_lock(&cli->cl_loi_list_lock);
2291 if (!list_empty(&oap->oap_rpc_item))
2292 GOTO(out, rc = -EBUSY);
2294 osc_exit_cache(cli, oap, 0);
2295 osc_wake_cache_waiters(cli);
2297 if (!list_empty(&oap->oap_urgent_item)) {
2298 list_del_init(&oap->oap_urgent_item);
2299 oap->oap_async_flags &= ~ASYNC_URGENT;
2301 if (!list_empty(&oap->oap_pending_item)) {
2302 list_del_init(&oap->oap_pending_item);
2303 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2305 loi_list_maint(cli, loi);
2307 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2309 spin_unlock(&cli->cl_loi_list_lock);
2313 /* Note: caller will lock/unlock, and set uptodate on the pages */
2314 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2315 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2316 struct lov_stripe_md *lsm, obd_count page_count,
2317 struct brw_page *pga)
2319 struct ptlrpc_request *request = NULL;
2320 struct ost_body *body;
2321 struct niobuf_remote *nioptr;
2322 struct obd_ioobj *iooptr;
2323 int rc, size[3] = {sizeof(*body)}, mapped = 0;
2324 struct obd_import *imp = class_exp2cliimp(exp);
2328 /* XXX does not handle 'new' brw protocol */
2330 size[1] = sizeof(struct obd_ioobj);
2331 size[2] = page_count * sizeof(*nioptr);
2333 request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
2334 OST_SAN_READ, 3, size, NULL);
2340 if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2341 request->rq_request_portal = OST_IO_PORTAL;
2343 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2344 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2345 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2346 sizeof(*nioptr) * page_count);
2348 memcpy(&body->oa, oa, sizeof(body->oa));
2350 obdo_to_ioobj(oa, iooptr);
2351 iooptr->ioo_bufcnt = page_count;
2353 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2354 LASSERT(PageLocked(pga[mapped].pg));
2355 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2357 nioptr->offset = pga[mapped].off;
2358 nioptr->len = pga[mapped].count;
2359 nioptr->flags = pga[mapped].flag;
2362 size[1] = page_count * sizeof(*nioptr);
2363 request->rq_replen = lustre_msg_size(2, size);
2365 rc = ptlrpc_queue_wait(request);
2369 body = lustre_swab_repbuf(request, 0, sizeof(*body),
2370 lustre_swab_ost_body);
2372 CERROR("Can't unpack body\n");
2373 GOTO(out_req, rc = -EPROTO);
2376 memcpy(oa, &body->oa, sizeof(*oa));
2378 swab = lustre_msg_swabbed(request->rq_repmsg);
2379 LASSERT_REPSWAB(request, 1);
2380 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2382 /* nioptr missing or short */
2383 GOTO(out_req, rc = -EPROTO);
2387 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2388 struct page *page = pga[mapped].pg;
2389 struct buffer_head *bh;
2393 lustre_swab_niobuf_remote (nioptr);
2395 /* got san device associated */
2396 LASSERT(exp->exp_obd != NULL);
2397 dev = exp->exp_obd->u.cli.cl_sandev;
2400 if (!nioptr->offset) {
2401 CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2402 page->mapping->host->i_ino,
2404 memset(page_address(page), 0, PAGE_SIZE);
2408 if (!page->buffers) {
2409 create_empty_buffers(page, dev, PAGE_SIZE);
2412 clear_bit(BH_New, &bh->b_state);
2413 set_bit(BH_Mapped, &bh->b_state);
2414 bh->b_blocknr = (unsigned long)nioptr->offset;
2416 clear_bit(BH_Uptodate, &bh->b_state);
2418 ll_rw_block(READ, 1, &bh);
2422 /* if buffer already existed, it must be the
2423 * one we mapped before, check it */
2424 LASSERT(!test_bit(BH_New, &bh->b_state));
2425 LASSERT(test_bit(BH_Mapped, &bh->b_state));
2426 LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2428 /* wait it's io completion */
2429 if (test_bit(BH_Lock, &bh->b_state))
2432 if (!test_bit(BH_Uptodate, &bh->b_state))
2433 ll_rw_block(READ, 1, &bh);
2437 /* must do syncronous write here */
2439 if (!buffer_uptodate(bh)) {
2447 ptlrpc_req_finished(request);
2451 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2452 struct lov_stripe_md *lsm, obd_count page_count,
2453 struct brw_page *pga)
2455 struct client_obd *cli = &exp->exp_obd->u.cli;
2456 struct ptlrpc_request *request = NULL;
2457 struct ost_body *body;
2458 struct niobuf_remote *nioptr;
2459 struct obd_ioobj *iooptr;
2460 struct obd_import *imp = class_exp2cliimp(exp);
2461 int rc, size[3] = {sizeof(*body)}, mapped = 0;
2465 size[1] = sizeof(struct obd_ioobj);
2466 size[2] = page_count * sizeof(*nioptr);
2468 request = ptlrpc_prep_req_pool(class_exp2cliimp(exp),
2469 LUSTRE_OST_VERSION, OST_SAN_WRITE,
2470 3, size, NULL, cli->cl_rq_pool);
2476 if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2477 request->rq_request_portal = OST_IO_PORTAL;
2479 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2480 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2481 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2482 sizeof (*nioptr) * page_count);
2484 memcpy(&body->oa, oa, sizeof(body->oa));
2486 obdo_to_ioobj(oa, iooptr);
2487 iooptr->ioo_bufcnt = page_count;
2490 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2491 LASSERT(PageLocked(pga[mapped].pg));
2492 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2494 nioptr->offset = pga[mapped].off;
2495 nioptr->len = pga[mapped].count;
2496 nioptr->flags = pga[mapped].flag;
2499 size[1] = page_count * sizeof(*nioptr);
2500 request->rq_replen = lustre_msg_size(2, size);
2502 rc = ptlrpc_queue_wait(request);
2506 swab = lustre_msg_swabbed (request->rq_repmsg);
2507 LASSERT_REPSWAB (request, 1);
2508 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2510 CERROR("absent/short niobuf array\n");
2511 GOTO(out_req, rc = -EPROTO);
2515 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2516 struct page *page = pga[mapped].pg;
2517 struct buffer_head *bh;
2521 lustre_swab_niobuf_remote (nioptr);
2523 /* got san device associated */
2524 LASSERT(exp->exp_obd != NULL);
2525 dev = exp->exp_obd->u.cli.cl_sandev;
2527 if (!page->buffers) {
2528 create_empty_buffers(page, dev, PAGE_SIZE);
2531 LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2532 LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2533 LASSERT(page->buffers->b_blocknr ==
2534 (unsigned long)nioptr->offset);
2540 /* if buffer locked, wait it's io completion */
2541 if (test_bit(BH_Lock, &bh->b_state))
2544 clear_bit(BH_New, &bh->b_state);
2545 set_bit(BH_Mapped, &bh->b_state);
2547 /* override the block nr */
2548 bh->b_blocknr = (unsigned long)nioptr->offset;
2550 /* we are about to write it, so set it
2552 * page lock should garentee no race condition here */
2553 set_bit(BH_Uptodate, &bh->b_state);
2554 set_bit(BH_Dirty, &bh->b_state);
2556 ll_rw_block(WRITE, 1, &bh);
2558 /* must do syncronous write here */
2560 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2568 ptlrpc_req_finished(request);
2572 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2573 struct lov_stripe_md *lsm, obd_count page_count,
2574 struct brw_page *pga, struct obd_trans_info *oti)
2578 while (page_count) {
2579 obd_count pages_per_brw;
2582 if (page_count > PTLRPC_MAX_BRW_PAGES)
2583 pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2585 pages_per_brw = page_count;
2587 if (cmd & OBD_BRW_WRITE)
2588 rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2590 rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2595 page_count -= pages_per_brw;
2596 pga += pages_per_brw;
2602 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2605 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2608 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2611 l_lock(&lock->l_resource->lr_namespace->ns_lock);
2613 if (lock->l_ast_data && lock->l_ast_data != data) {
2614 struct inode *new_inode = data;
2615 struct inode *old_inode = lock->l_ast_data;
2616 if (!(old_inode->i_state & I_FREEING))
2617 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2618 LASSERTF(old_inode->i_state & I_FREEING,
2619 "Found existing inode %p/%lu/%u state %lu in lock: "
2620 "setting data to %p/%lu/%u\n", old_inode,
2621 old_inode->i_ino, old_inode->i_generation,
2623 new_inode, new_inode->i_ino, new_inode->i_generation);
2626 lock->l_ast_data = data;
2627 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2628 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
2629 LDLM_LOCK_PUT(lock);
2632 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2633 ldlm_iterator_t replace, void *data)
2635 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2636 struct obd_device *obd = class_exp2obd(exp);
2638 ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2642 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2643 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2644 int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2645 void *data, __u32 lvb_len, void *lvb_swabber,
2646 struct lustre_handle *lockh)
2648 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2649 struct obd_device *obd = exp->exp_obd;
2651 struct ldlm_reply *rep;
2652 struct ptlrpc_request *req = NULL;
2656 /* Filesystem lock extents are extended to page boundaries so that
2657 * dealing with the page cache is a little smoother. */
2658 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2659 policy->l_extent.end |= ~PAGE_MASK;
2661 if (lsm->lsm_oinfo->loi_kms_valid == 0)
2664 /* Next, search for already existing extent locks that will cover us */
2665 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
2668 osc_set_data_with_check(lockh, data, *flags);
2669 if (*flags & LDLM_FL_HAS_INTENT) {
2670 /* I would like to be able to ASSERT here that rss <=
2671 * kms, but I can't, for reasons which are explained in
2674 /* We already have a lock, and it's referenced */
2678 /* If we're trying to read, we also search for an existing PW lock. The
2679 * VFS and page cache already protect us locally, so lots of readers/
2680 * writers can share a single PW lock.
2682 * There are problems with conversion deadlocks, so instead of
2683 * converting a read lock to a write lock, we'll just enqueue a new
2686 * At some point we should cancel the read lock instead of making them
2687 * send us a blocking callback, but there are problems with canceling
2688 * locks out from other users right now, too. */
2690 if (mode == LCK_PR) {
2691 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2692 policy, LCK_PW, lockh);
2694 /* FIXME: This is not incredibly elegant, but it might
2695 * be more elegant than adding another parameter to
2696 * lock_match. I want a second opinion. */
2697 ldlm_lock_addref(lockh, LCK_PR);
2698 ldlm_lock_decref(lockh, LCK_PW);
2699 osc_set_data_with_check(lockh, data, *flags);
2705 if (*flags & LDLM_FL_HAS_INTENT) {
2706 int size[2] = {sizeof(struct ldlm_request), sizeof(lvb)};
2708 req = ptlrpc_prep_req(class_exp2cliimp(exp),
2709 LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 1,
2714 size[0] = sizeof(*rep);
2715 req->rq_replen = lustre_msg_size(2, size);
2718 rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
2719 policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2720 &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2723 if (rc == ELDLM_LOCK_ABORTED) {
2724 /* swabbed by ldlm_cli_enqueue() */
2725 LASSERT_REPSWABBED(req, 0);
2726 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
2727 LASSERT(rep != NULL);
2728 if (rep->lock_policy_res1)
2729 rc = rep->lock_policy_res1;
2731 ptlrpc_req_finished(req);
2734 if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2735 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2736 lvb.lvb_size, lvb.lvb_blocks, lvb.lvb_mtime);
2737 lsm->lsm_oinfo->loi_rss = lvb.lvb_size;
2738 lsm->lsm_oinfo->loi_mtime = lvb.lvb_mtime;
2739 lsm->lsm_oinfo->loi_blocks = lvb.lvb_blocks;
2745 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2746 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2747 int *flags, void *data, struct lustre_handle *lockh)
2749 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2750 struct obd_device *obd = exp->exp_obd;
2754 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2756 /* Filesystem lock extents are extended to page boundaries so that
2757 * dealing with the page cache is a little smoother */
2758 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2759 policy->l_extent.end |= ~PAGE_MASK;
2761 /* Next, search for already existing extent locks that will cover us */
2762 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2763 policy, mode, lockh);
2765 //if (!(*flags & LDLM_FL_TEST_LOCK))
2766 osc_set_data_with_check(lockh, data, *flags);
2769 /* If we're trying to read, we also search for an existing PW lock. The
2770 * VFS and page cache already protect us locally, so lots of readers/
2771 * writers can share a single PW lock. */
2772 if (mode == LCK_PR) {
2773 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2774 policy, LCK_PW, lockh);
2775 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2776 /* FIXME: This is not incredibly elegant, but it might
2777 * be more elegant than adding another parameter to
2778 * lock_match. I want a second opinion. */
2779 osc_set_data_with_check(lockh, data, *flags);
2780 ldlm_lock_addref(lockh, LCK_PR);
2781 ldlm_lock_decref(lockh, LCK_PW);
2787 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2788 __u32 mode, struct lustre_handle *lockh)
2792 if (unlikely(mode == LCK_GROUP))
2793 ldlm_lock_decref_and_cancel(lockh, mode);
2795 ldlm_lock_decref(lockh, mode);
2800 static int osc_cancel_unused(struct obd_export *exp,
2801 struct lov_stripe_md *lsm, int flags, void *opaque)
2803 struct obd_device *obd = class_exp2obd(exp);
2804 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2806 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2810 static int osc_join_lru(struct obd_export *exp,
2811 struct lov_stripe_md *lsm, int join)
2813 struct obd_device *obd = class_exp2obd(exp);
2814 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2816 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2819 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2820 unsigned long max_age)
2822 struct obd_statfs *msfs;
2823 struct ptlrpc_request *request;
2824 int rc, size = sizeof(*osfs);
2827 /* We could possibly pass max_age in the request (as an absolute
2828 * timestamp or a "seconds.usec ago") so the target can avoid doing
2829 * extra calls into the filesystem if that isn't necessary (e.g.
2830 * during mount that would help a bit). Having relative timestamps
2831 * is not so great if request processing is slow, while absolute
2832 * timestamps are not ideal because they need time synchronization. */
2833 request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2834 OST_STATFS,0,NULL,NULL);
2838 request->rq_replen = lustre_msg_size(1, &size);
2839 request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2841 rc = ptlrpc_queue_wait(request);
2845 msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2846 lustre_swab_obd_statfs);
2848 CERROR("Can't unpack obd_statfs\n");
2849 GOTO(out, rc = -EPROTO);
2852 memcpy(osfs, msfs, sizeof(*osfs));
2856 ptlrpc_req_finished(request);
2860 /* Retrieve object striping information.
2862 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2863 * the maximum number of OST indices which will fit in the user buffer.
2864 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2866 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2868 struct lov_user_md lum, *lumk;
2869 int rc = 0, lum_size;
2875 if (copy_from_user(&lum, lump, sizeof(lum)))
2878 if (lum.lmm_magic != LOV_USER_MAGIC)
2881 if (lum.lmm_stripe_count > 0) {
2882 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2883 OBD_ALLOC(lumk, lum_size);
2887 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2889 lum_size = sizeof(lum);
2893 lumk->lmm_object_id = lsm->lsm_object_id;
2894 lumk->lmm_stripe_count = 1;
2896 if (copy_to_user(lump, lumk, lum_size))
2900 OBD_FREE(lumk, lum_size);
2906 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2907 void *karg, void *uarg)
2909 struct obd_device *obd = exp->exp_obd;
2910 struct obd_ioctl_data *data = karg;
2914 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2917 if (!try_module_get(THIS_MODULE)) {
2918 CERROR("Can't get module. Is it alive?");
2923 case OBD_IOC_LOV_GET_CONFIG: {
2925 struct lov_desc *desc;
2926 struct obd_uuid uuid;
2930 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2931 GOTO(out, err = -EINVAL);
2933 data = (struct obd_ioctl_data *)buf;
2935 if (sizeof(*desc) > data->ioc_inllen1) {
2936 obd_ioctl_freedata(buf, len);
2937 GOTO(out, err = -EINVAL);
2940 if (data->ioc_inllen2 < sizeof(uuid)) {
2941 obd_ioctl_freedata(buf, len);
2942 GOTO(out, err = -EINVAL);
2945 desc = (struct lov_desc *)data->ioc_inlbuf1;
2946 desc->ld_tgt_count = 1;
2947 desc->ld_active_tgt_count = 1;
2948 desc->ld_default_stripe_count = 1;
2949 desc->ld_default_stripe_size = 0;
2950 desc->ld_default_stripe_offset = 0;
2951 desc->ld_pattern = 0;
2952 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2954 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2956 err = copy_to_user((void *)uarg, buf, len);
2959 obd_ioctl_freedata(buf, len);
2962 case LL_IOC_LOV_SETSTRIPE:
2963 err = obd_alloc_memmd(exp, karg);
2967 case LL_IOC_LOV_GETSTRIPE:
2968 err = osc_getstripe(karg, uarg);
2970 case OBD_IOC_CLIENT_RECOVER:
2971 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2976 case IOC_OSC_SET_ACTIVE:
2977 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2980 case OBD_IOC_POLL_QUOTACHECK:
2981 err = lquota_poll_check(quota_interface, exp,
2982 (struct if_quotacheck *)karg);
2985 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2986 cmd, current->comm);
2987 GOTO(out, err = -ENOTTY);
2990 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2993 module_put(THIS_MODULE);
2998 static int osc_get_info(struct obd_export *exp, obd_count keylen,
2999 void *key, __u32 *vallen, void *val)
3002 if (!vallen || !val)
3005 if (keylen > strlen("lock_to_stripe") &&
3006 strcmp(key, "lock_to_stripe") == 0) {
3007 __u32 *stripe = val;
3008 *vallen = sizeof(*stripe);
3011 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3012 struct ptlrpc_request *req;
3014 char *bufs[1] = {key};
3016 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3017 OST_GET_INFO, 1, &keylen, bufs);
3021 req->rq_replen = lustre_msg_size(1, vallen);
3022 rc = ptlrpc_queue_wait(req);
3026 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
3027 lustre_swab_ost_last_id);
3028 if (reply == NULL) {
3029 CERROR("Can't unpack OST last ID\n");
3030 GOTO(out, rc = -EPROTO);
3032 *((obd_id *)val) = *reply;
3034 ptlrpc_req_finished(req);
3040 static int osc_set_info(struct obd_export *exp, obd_count keylen,
3041 void *key, obd_count vallen, void *val)
3043 struct ptlrpc_request *req;
3044 struct obd_device *obd = exp->exp_obd;
3045 struct obd_import *imp = class_exp2cliimp(exp);
3046 struct llog_ctxt *ctxt;
3047 int rc, size[2] = {keylen, vallen};
3048 char *bufs[2] = {key, val};
3051 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3053 if (KEY_IS(KEY_NEXT_ID)) {
3054 if (vallen != sizeof(obd_id))
3056 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3057 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3058 exp->exp_obd->obd_name,
3059 obd->u.cli.cl_oscc.oscc_next_id);
3064 if (KEY_IS("unlinked")) {
3065 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3066 spin_lock(&oscc->oscc_lock);
3067 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3068 spin_unlock(&oscc->oscc_lock);
3072 if (KEY_IS("initial_recov")) {
3073 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
3074 if (vallen != sizeof(int))
3076 imp->imp_initial_recov = *(int *)val;
3077 CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
3078 exp->exp_obd->obd_name,
3079 imp->imp_initial_recov);
3083 if (KEY_IS("checksum")) {
3084 if (vallen != sizeof(int))
3086 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3090 if (!KEY_IS(KEY_MDS_CONN) && !KEY_IS("evict_by_nid"))
3094 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO,
3099 req->rq_replen = lustre_msg_size(0, NULL);
3100 rc = ptlrpc_queue_wait(req);
3101 ptlrpc_req_finished(req);
3103 ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_ORIG_CTXT);
3106 rc = llog_initiator_connect(ctxt);
3108 CERROR("cannot establish connection for ctxt %p: %d\n",
3112 imp->imp_server_timeout = 1;
3113 CDEBUG(D_HA, "pinging OST %s\n", imp->imp_target_uuid.uuid);
3114 imp->imp_pingable = 1;
3120 static struct llog_operations osc_size_repl_logops = {
3121 lop_cancel: llog_obd_repl_cancel
3124 static struct llog_operations osc_mds_ost_orig_logops;
3125 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3126 int count, struct llog_catid *catid)
3131 osc_mds_ost_orig_logops = llog_lvfs_ops;
3132 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3133 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3134 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3135 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3137 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3138 &catid->lci_logid, &osc_mds_ost_orig_logops);
3142 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3143 &osc_size_repl_logops);
3147 static int osc_llog_finish(struct obd_device *obd, int count)
3149 struct llog_ctxt *ctxt;
3150 int rc = 0, rc2 = 0;
3153 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3155 rc = llog_cleanup(ctxt);
3157 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3159 rc2 = llog_cleanup(ctxt);
3166 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3167 struct obd_uuid *cluuid,
3168 struct obd_connect_data *data)
3170 struct client_obd *cli = &obd->u.cli;
3172 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3175 spin_lock(&cli->cl_loi_list_lock);
3176 data->ocd_grant = cli->cl_avail_grant ?:
3177 2 * cli->cl_max_pages_per_rpc << PAGE_SHIFT;
3178 lost_grant = cli->cl_lost_grant;
3179 cli->cl_lost_grant = 0;
3180 spin_unlock(&cli->cl_loi_list_lock);
3182 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3183 "cl_lost_grant: %ld\n", data->ocd_grant,
3184 cli->cl_avail_grant, lost_grant);
3185 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3186 " ocd_grant: %d\n", data->ocd_connect_flags,
3187 data->ocd_version, data->ocd_grant);
3193 static int osc_disconnect(struct obd_export *exp)
3195 struct obd_device *obd = class_exp2obd(exp);
3196 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3199 if (obd->u.cli.cl_conn_count == 1)
3200 /* flush any remaining cancel messages out to the target */
3201 llog_sync(ctxt, exp);
3203 rc = client_disconnect_export(exp);
3207 static int osc_import_event(struct obd_device *obd,
3208 struct obd_import *imp,
3209 enum obd_import_event event)
3211 struct client_obd *cli;
3214 LASSERT(imp->imp_obd == obd);
3217 case IMP_EVENT_DISCON: {
3218 /* Only do this on the MDS OSC's */
3219 if (imp->imp_server_timeout) {
3220 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3222 spin_lock(&oscc->oscc_lock);
3223 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3224 spin_unlock(&oscc->oscc_lock);
3229 case IMP_EVENT_INACTIVE: {
3230 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3233 case IMP_EVENT_INVALIDATE: {
3234 struct ldlm_namespace *ns = obd->obd_namespace;
3238 spin_lock(&cli->cl_loi_list_lock);
3239 cli->cl_avail_grant = 0;
3240 cli->cl_lost_grant = 0;
3241 /* all pages go to failing rpcs due to the invalid import */
3242 osc_check_rpcs(cli);
3243 spin_unlock(&cli->cl_loi_list_lock);
3245 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3249 case IMP_EVENT_ACTIVE: {
3250 /* Only do this on the MDS OSC's */
3251 if (imp->imp_server_timeout) {
3252 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3254 spin_lock(&oscc->oscc_lock);
3255 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3256 spin_unlock(&oscc->oscc_lock);
3258 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3261 case IMP_EVENT_OCD: {
3262 struct obd_connect_data *ocd = &imp->imp_connect_data;
3264 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3265 osc_init_grant(&obd->u.cli, ocd);
3268 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3269 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3271 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3275 CERROR("Unknown import event %d\n", event);
3281 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3285 rc = ptlrpcd_addref();
3289 rc = client_obd_setup(obd, len, buf);
3293 struct lprocfs_static_vars lvars;
3294 struct client_obd *cli = &obd->u.cli;
3296 lprocfs_init_vars(osc, &lvars);
3297 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3298 lproc_osc_attach_seqstat(obd);
3299 ptlrpc_lprocfs_register_obd(obd);
3303 /* We need to allocate a few requests more, because
3304 brw_interpret_oap tries to create new requests before freeing
3305 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3306 reserved, but I afraid that might be too much wasted RAM
3307 in fact, so 2 is just my guess and still should work. */
3308 cli->cl_rq_pool = ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3310 ptlrpc_add_rqs_to_pool);
3316 static int osc_precleanup(struct obd_device *obd, int stage)
3322 case OBD_CLEANUP_EARLY: {
3323 struct obd_import *imp;
3324 imp = obd->u.cli.cl_import;
3325 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3326 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3327 ptlrpc_deactivate_import(imp);
3330 case OBD_CLEANUP_SELF_EXP:
3331 rc = obd_llog_finish(obd, 0);
3333 CERROR("failed to cleanup llogging subsystems\n");
3338 int osc_cleanup(struct obd_device *obd)
3340 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3341 struct client_obd *cli = &obd->u.cli;
3344 ptlrpc_lprocfs_unregister_obd(obd);
3345 lprocfs_obd_cleanup(obd);
3347 spin_lock(&oscc->oscc_lock);
3348 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3349 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3350 spin_unlock(&oscc->oscc_lock);
3352 /* free memory of osc quota cache */
3353 lquota_cleanup(quota_interface, obd);
3355 rc = client_obd_cleanup(obd);
3357 ptlrpc_free_rq_pool(cli->cl_rq_pool);
3364 struct obd_ops osc_obd_ops = {
3365 .o_owner = THIS_MODULE,
3366 .o_setup = osc_setup,
3367 .o_precleanup = osc_precleanup,
3368 .o_cleanup = osc_cleanup,
3369 .o_add_conn = client_import_add_conn,
3370 .o_del_conn = client_import_del_conn,
3371 .o_connect = client_connect_import,
3372 .o_reconnect = osc_reconnect,
3373 .o_disconnect = osc_disconnect,
3374 .o_statfs = osc_statfs,
3375 .o_packmd = osc_packmd,
3376 .o_unpackmd = osc_unpackmd,
3377 .o_create = osc_create,
3378 .o_destroy = osc_destroy,
3379 .o_getattr = osc_getattr,
3380 .o_getattr_async = osc_getattr_async,
3381 .o_setattr = osc_setattr,
3382 .o_setattr_async = osc_setattr_async,
3384 .o_brw_async = osc_brw_async,
3385 .o_prep_async_page = osc_prep_async_page,
3386 .o_queue_async_io = osc_queue_async_io,
3387 .o_set_async_flags = osc_set_async_flags,
3388 .o_queue_group_io = osc_queue_group_io,
3389 .o_trigger_group_io = osc_trigger_group_io,
3390 .o_teardown_async_page = osc_teardown_async_page,
3391 .o_punch = osc_punch,
3393 .o_enqueue = osc_enqueue,
3394 .o_match = osc_match,
3395 .o_change_cbdata = osc_change_cbdata,
3396 .o_cancel = osc_cancel,
3397 .o_cancel_unused = osc_cancel_unused,
3398 .o_join_lru = osc_join_lru,
3399 .o_iocontrol = osc_iocontrol,
3400 .o_get_info = osc_get_info,
3401 .o_set_info = osc_set_info,
3402 .o_import_event = osc_import_event,
3403 .o_llog_init = osc_llog_init,
3404 .o_llog_finish = osc_llog_finish,
3407 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3408 struct obd_ops sanosc_obd_ops = {
3409 .o_owner = THIS_MODULE,
3410 .o_cleanup = client_obd_cleanup,
3411 .o_add_conn = client_import_add_conn,
3412 .o_del_conn = client_import_del_conn,
3413 .o_connect = client_connect_import,
3414 .o_reconnect = osc_reconnect,
3415 .o_disconnect = client_disconnect_export,
3416 .o_statfs = osc_statfs,
3417 .o_packmd = osc_packmd,
3418 .o_unpackmd = osc_unpackmd,
3419 .o_create = osc_real_create,
3420 .o_destroy = osc_destroy,
3421 .o_getattr = osc_getattr,
3422 .o_getattr_async = osc_getattr_async,
3423 .o_setattr = osc_setattr,
3424 .o_setup = client_sanobd_setup,
3425 .o_brw = sanosc_brw,
3426 .o_punch = osc_punch,
3428 .o_enqueue = osc_enqueue,
3429 .o_match = osc_match,
3430 .o_change_cbdata = osc_change_cbdata,
3431 .o_cancel = osc_cancel,
3432 .o_cancel_unused = osc_cancel_unused,
3433 .o_join_lru = osc_join_lru,
3434 .o_iocontrol = osc_iocontrol,
3435 .o_import_event = osc_import_event,
3436 .o_llog_init = osc_llog_init,
3437 .o_llog_finish = osc_llog_finish,
3441 static quota_interface_t *quota_interface;
3442 extern quota_interface_t osc_quota_interface;
3444 int __init osc_init(void)
3446 struct lprocfs_static_vars lvars;
3447 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3448 struct lprocfs_static_vars sanlvars;
3453 lprocfs_init_vars(osc, &lvars);
3454 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3455 lprocfs_init_vars(osc, &sanlvars);
3458 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3459 lquota_init(quota_interface);
3460 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3462 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3465 if (quota_interface)
3466 PORTAL_SYMBOL_PUT(osc_quota_interface);
3470 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3471 rc = class_register_type(&sanosc_obd_ops, sanlvars.module_vars,
3472 LUSTRE_SANOSC_NAME);
3474 class_unregister_type(LUSTRE_OSC_NAME);
3475 if (quota_interface)
3476 PORTAL_SYMBOL_PUT(osc_quota_interface);
3485 static void /*__exit*/ osc_exit(void)
3487 lquota_exit(quota_interface);
3488 if (quota_interface)
3489 PORTAL_SYMBOL_PUT(osc_quota_interface);
3491 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3492 class_unregister_type(LUSTRE_SANOSC_NAME);
3494 class_unregister_type(LUSTRE_OSC_NAME);
3497 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3498 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3499 MODULE_LICENSE("GPL");
3501 module_init(osc_init);
3502 module_exit(osc_exit);