1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE(*lsmp, lsm_size);
131 OBD_ALLOC(*lsmp, lsm_size);
134 loi_init((*lsmp)->lsm_oinfo);
138 /* XXX zero *lsmp? */
139 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
140 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
141 LASSERT((*lsmp)->lsm_object_id);
142 LASSERT((*lsmp)->lsm_object_gr);
145 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
150 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
151 struct ost_body *body, void *capa)
153 struct obd_capa *oc = (struct obd_capa *)capa;
154 struct lustre_capa *c;
159 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
162 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
163 DEBUG_CAPA(D_SEC, c, "pack");
166 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
167 struct obd_info *oinfo)
169 struct ost_body *body;
171 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
172 body->oa = *oinfo->oi_oa;
173 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
176 static int osc_getattr_interpret(struct ptlrpc_request *req,
177 struct osc_async_args *aa, int rc)
179 struct ost_body *body;
185 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
186 lustre_swab_ost_body);
188 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
189 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
191 /* This should really be sent by the OST */
192 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
193 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
195 CERROR("can't unpack ost_body\n");
197 aa->aa_oi->oi_oa->o_valid = 0;
200 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
204 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
205 struct ptlrpc_request_set *set)
207 struct ptlrpc_request *req;
208 struct ost_body *body;
209 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
210 struct osc_async_args *aa;
213 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
214 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
215 OST_GETATTR, 3, size,NULL);
219 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
221 ptlrpc_req_set_repsize(req, 2, size);
222 req->rq_interpret_reply = osc_getattr_interpret;
224 LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
225 aa = (struct osc_async_args *)&req->rq_async_args;
228 ptlrpc_set_add_req(set, req);
232 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
234 struct ptlrpc_request *req;
235 struct ost_body *body;
236 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
239 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
240 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
241 OST_GETATTR, 3, size, NULL);
245 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
247 ptlrpc_req_set_repsize(req, 2, size);
249 rc = ptlrpc_queue_wait(req);
251 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
255 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
256 lustre_swab_ost_body);
258 CERROR ("can't unpack ost_body\n");
259 GOTO (out, rc = -EPROTO);
262 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
263 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
265 /* This should really be sent by the OST */
266 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
267 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
271 ptlrpc_req_finished(req);
275 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
276 struct obd_trans_info *oti)
278 struct ptlrpc_request *req;
279 struct ost_body *body;
280 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
283 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
284 oinfo->oi_oa->o_gr > 0);
285 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
286 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
287 OST_SETATTR, 3, size, NULL);
291 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
293 ptlrpc_req_set_repsize(req, 2, size);
295 rc = ptlrpc_queue_wait(req);
299 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
300 lustre_swab_ost_body);
302 GOTO(out, rc = -EPROTO);
304 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
308 ptlrpc_req_finished(req);
312 static int osc_setattr_interpret(struct ptlrpc_request *req,
313 struct osc_async_args *aa, int rc)
315 struct ost_body *body;
321 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
322 lustre_swab_ost_body);
324 CERROR("can't unpack ost_body\n");
325 GOTO(out, rc = -EPROTO);
328 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
330 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
334 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
335 struct obd_trans_info *oti,
336 struct ptlrpc_request_set *rqset)
338 struct ptlrpc_request *req;
339 struct ost_body *body;
340 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
341 struct osc_async_args *aa;
344 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
345 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
346 OST_SETATTR, 3, size, NULL);
350 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
351 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
353 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
355 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
356 sizeof(*oti->oti_logcookies));
359 ptlrpc_req_set_repsize(req, 2, size);
360 /* do mds to ost setattr asynchronouly */
362 /* Do not wait for response. */
363 ptlrpcd_add_req(req);
365 req->rq_interpret_reply = osc_setattr_interpret;
367 LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
368 aa = (struct osc_async_args *)&req->rq_async_args;
371 ptlrpc_set_add_req(rqset, req);
377 int osc_real_create(struct obd_export *exp, struct obdo *oa,
378 struct lov_stripe_md **ea, struct obd_trans_info *oti)
380 struct ptlrpc_request *req;
381 struct ost_body *body;
382 struct lov_stripe_md *lsm;
383 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
391 rc = obd_alloc_memmd(exp, &lsm);
396 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
397 OST_CREATE, 2, size, NULL);
399 GOTO(out, rc = -ENOMEM);
401 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404 ptlrpc_req_set_repsize(req, 2, size);
405 if (oa->o_valid & OBD_MD_FLINLINE) {
406 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
407 oa->o_flags == OBD_FL_DELORPHAN);
409 "delorphan from OST integration");
410 /* Don't resend the delorphan req */
411 req->rq_no_resend = req->rq_no_delay = 1;
414 rc = ptlrpc_queue_wait(req);
418 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
419 lustre_swab_ost_body);
421 CERROR ("can't unpack ost_body\n");
422 GOTO (out_req, rc = -EPROTO);
425 memcpy(oa, &body->oa, sizeof(*oa));
427 /* This should really be sent by the OST */
428 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
429 oa->o_valid |= OBD_MD_FLBLKSZ;
431 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
432 * have valid lsm_oinfo data structs, so don't go touching that.
433 * This needs to be fixed in a big way.
435 lsm->lsm_object_id = oa->o_id;
436 lsm->lsm_object_gr = oa->o_gr;
440 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
442 if (oa->o_valid & OBD_MD_FLCOOKIE) {
443 if (!oti->oti_logcookies)
444 oti_alloc_cookies(oti, 1);
445 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
446 sizeof(oti->oti_onecookie));
450 CDEBUG(D_HA, "transno: "LPD64"\n",
451 lustre_msg_get_transno(req->rq_repmsg));
454 ptlrpc_req_finished(req);
457 obd_free_memmd(exp, &lsm);
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462 struct osc_async_args *aa, int rc)
464 struct ost_body *body;
470 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471 lustre_swab_ost_body);
473 CERROR ("can't unpack ost_body\n");
474 GOTO(out, rc = -EPROTO);
477 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
479 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484 struct obd_trans_info *oti,
485 struct ptlrpc_request_set *rqset)
487 struct ptlrpc_request *req;
488 struct osc_async_args *aa;
489 struct ost_body *body;
490 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
498 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500 OST_PUNCH, 3, size, NULL);
504 /* FIXME bug 249. Also see bug 7198 */
505 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
506 OBD_CONNECT_REQPORTAL)
507 req->rq_request_portal = OST_IO_PORTAL;
509 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
510 /* overload the size and blocks fields in the oa with start/end */
511 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
512 body->oa.o_size = oinfo->oi_policy.l_extent.start;
513 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
514 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
516 ptlrpc_req_set_repsize(req, 2, size);
518 req->rq_interpret_reply = osc_punch_interpret;
519 LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
520 aa = (struct osc_async_args *)&req->rq_async_args;
522 ptlrpc_set_add_req(rqset, req);
527 static int osc_sync(struct obd_export *exp, struct obdo *oa,
528 struct lov_stripe_md *md, obd_size start, obd_size end,
531 struct ptlrpc_request *req;
532 struct ost_body *body;
533 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
541 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
543 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
544 OST_SYNC, 3, size, NULL);
548 /* overload the size and blocks fields in the oa with start/end */
549 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
551 body->oa.o_size = start;
552 body->oa.o_blocks = end;
553 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
555 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
557 ptlrpc_req_set_repsize(req, 2, size);
559 rc = ptlrpc_queue_wait(req);
563 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
564 lustre_swab_ost_body);
566 CERROR ("can't unpack ost_body\n");
567 GOTO (out, rc = -EPROTO);
570 memcpy(oa, &body->oa, sizeof(*oa));
574 ptlrpc_req_finished(req);
578 /* Destroy requests can be async always on the client, and we don't even really
579 * care about the return code since the client cannot do anything at all about
581 * When the MDS is unlinking a filename, it saves the file objects into a
582 * recovery llog, and these object records are cancelled when the OST reports
583 * they were destroyed and sync'd to disk (i.e. transaction committed).
584 * If the client dies, or the OST is down when the object should be destroyed,
585 * the records are not cancelled, and when the OST reconnects to the MDS next,
586 * it will retrieve the llog unlink logs and then sends the log cancellation
587 * cookies to the MDS after committing destroy transactions. */
588 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
589 struct lov_stripe_md *ea, struct obd_trans_info *oti,
590 struct obd_export *md_export)
592 struct ptlrpc_request *req;
593 struct ost_body *body;
594 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
602 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603 OST_DESTROY, 2, size, NULL);
607 /* FIXME bug 249. Also see bug 7198 */
608 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
609 OBD_CONNECT_REQPORTAL)
610 req->rq_request_portal = OST_IO_PORTAL;
612 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
613 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
614 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
615 sizeof(*oti->oti_logcookies));
618 ptlrpc_req_set_repsize(req, 2, size);
620 ptlrpcd_add_req(req);
624 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
627 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
629 LASSERT(!(oa->o_valid & bits));
632 client_obd_list_lock(&cli->cl_loi_list_lock);
633 oa->o_dirty = cli->cl_dirty;
634 if (cli->cl_dirty > cli->cl_dirty_max) {
635 CERROR("dirty %lu > dirty_max %lu\n",
636 cli->cl_dirty, cli->cl_dirty_max);
638 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
639 CERROR("dirty %lu - dirty_max %lu too big???\n",
640 cli->cl_dirty, cli->cl_dirty_max);
643 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
644 (cli->cl_max_rpcs_in_flight + 1);
645 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
647 oa->o_grant = cli->cl_avail_grant;
648 oa->o_dropped = cli->cl_lost_grant;
649 cli->cl_lost_grant = 0;
650 client_obd_list_unlock(&cli->cl_loi_list_lock);
651 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
652 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
655 /* caller must hold loi_list_lock */
656 static void osc_consume_write_grant(struct client_obd *cli,
657 struct osc_async_page *oap)
659 cli->cl_dirty += CFS_PAGE_SIZE;
660 cli->cl_avail_grant -= CFS_PAGE_SIZE;
661 oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
662 CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", CFS_PAGE_SIZE, oap);
663 LASSERT(cli->cl_avail_grant >= 0);
666 static unsigned long rpcs_in_flight(struct client_obd *cli)
668 return cli->cl_r_in_flight + cli->cl_w_in_flight;
671 /* caller must hold loi_list_lock */
672 void osc_wake_cache_waiters(struct client_obd *cli)
674 struct list_head *l, *tmp;
675 struct osc_cache_waiter *ocw;
678 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
679 /* if we can't dirty more, we must wait until some is written */
680 if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) {
681 CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
682 cli->cl_dirty, cli->cl_dirty_max);
686 /* if still dirty cache but no grant wait for pending RPCs that
687 * may yet return us some grant before doing sync writes */
688 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
689 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
690 cli->cl_w_in_flight);
694 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
695 list_del_init(&ocw->ocw_entry);
696 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
697 /* no more RPCs in flight to return grant, do sync IO */
698 ocw->ocw_rc = -EDQUOT;
699 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
701 osc_consume_write_grant(cli, ocw->ocw_oap);
704 cfs_waitq_signal(&ocw->ocw_waitq);
710 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
712 client_obd_list_lock(&cli->cl_loi_list_lock);
713 cli->cl_avail_grant = ocd->ocd_grant;
714 client_obd_list_unlock(&cli->cl_loi_list_lock);
716 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
717 cli->cl_avail_grant, cli->cl_lost_grant);
718 LASSERT(cli->cl_avail_grant >= 0);
721 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
723 client_obd_list_lock(&cli->cl_loi_list_lock);
724 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
725 cli->cl_avail_grant += body->oa.o_grant;
726 /* waiters are woken in brw_interpret_oap */
727 client_obd_list_unlock(&cli->cl_loi_list_lock);
730 /* We assume that the reason this OSC got a short read is because it read
731 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
732 * via the LOV, and it _knows_ it's reading inside the file, it's just that
733 * this stripe never got written at or beyond this stripe offset yet. */
734 static void handle_short_read(int nob_read, obd_count page_count,
735 struct brw_page **pga)
740 /* skip bytes read OK */
741 while (nob_read > 0) {
742 LASSERT (page_count > 0);
744 if (pga[i]->count > nob_read) {
745 /* EOF inside this page */
746 ptr = cfs_kmap(pga[i]->pg) +
747 (pga[i]->off & ~CFS_PAGE_MASK);
748 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
749 cfs_kunmap(pga[i]->pg);
755 nob_read -= pga[i]->count;
760 /* zero remaining pages */
761 while (page_count-- > 0) {
762 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
763 memset(ptr, 0, pga[i]->count);
764 cfs_kunmap(pga[i]->pg);
769 static int check_write_rcs(struct ptlrpc_request *req,
770 int requested_nob, int niocount,
771 obd_count page_count, struct brw_page **pga)
775 /* return error if any niobuf was in error */
776 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
777 sizeof(*remote_rcs) * niocount, NULL);
778 if (remote_rcs == NULL) {
779 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
782 if (lustre_msg_swabbed(req->rq_repmsg))
783 for (i = 0; i < niocount; i++)
784 __swab32s(&remote_rcs[i]);
786 for (i = 0; i < niocount; i++) {
787 if (remote_rcs[i] < 0)
788 return(remote_rcs[i]);
790 if (remote_rcs[i] != 0) {
791 CERROR("rc[%d] invalid (%d) req %p\n",
792 i, remote_rcs[i], req);
797 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
798 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
799 requested_nob, req->rq_bulk->bd_nob_transferred);
806 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
808 if (p1->flag != p2->flag) {
809 unsigned mask = ~OBD_BRW_FROM_GRANT;
811 /* warn if we try to combine flags that we don't know to be
813 if ((p1->flag & mask) != (p2->flag & mask))
814 CERROR("is it ok to have flags 0x%x and 0x%x in the "
815 "same brw?\n", p1->flag, p2->flag);
819 return (p1->off + p1->count == p2->off);
822 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
823 struct brw_page **pga)
828 LASSERT (pg_count > 0);
829 while (nob > 0 && pg_count > 0) {
830 char *ptr = cfs_kmap(pga[i]->pg);
831 int off = pga[i]->off & ~CFS_PAGE_MASK;
832 int count = pga[i]->count > nob ? nob : pga[i]->count;
834 cksum = crc32_le(cksum, ptr + off, count);
835 cfs_kunmap(pga[i]->pg);
836 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
839 nob -= pga[i]->count;
847 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
848 struct lov_stripe_md *lsm, obd_count page_count,
849 struct brw_page **pga, int *requested_nobp,
850 int *niocountp, struct ptlrpc_request **reqp,
851 struct obd_capa *ocapa)
853 struct ptlrpc_request *req;
854 struct ptlrpc_bulk_desc *desc;
855 struct client_obd *cli = &imp->imp_obd->u.cli;
856 struct ost_body *body;
857 struct obd_ioobj *ioobj;
858 struct niobuf_remote *niobuf;
859 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
860 int niocount, i, requested_nob, opc, rc;
861 struct ptlrpc_request_pool *pool;
862 struct lustre_capa *capa;
865 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
866 pool = ((cmd & OBD_BRW_WRITE) != 0) ? imp->imp_rq_pool : NULL;
868 for (niocount = i = 1; i < page_count; i++) {
869 if (!can_merge_pages(pga[i - 1], pga[i]))
873 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
874 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
876 size[REQ_REC_OFF + 3] = sizeof(*capa);
878 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
879 req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 5, size, NULL,
884 /* FIXME bug 249. Also see bug 7198 */
885 if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
886 req->rq_request_portal = OST_IO_PORTAL;
888 if (opc == OST_WRITE)
889 desc = ptlrpc_prep_bulk_imp (req, page_count,
890 BULK_GET_SOURCE, OST_BULK_PORTAL);
892 desc = ptlrpc_prep_bulk_imp (req, page_count,
893 BULK_PUT_SINK, OST_BULK_PORTAL);
895 GOTO(out, rc = -ENOMEM);
896 /* NB request now owns desc and will free it when it gets freed */
898 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
899 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
900 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
901 niocount * sizeof(*niobuf));
905 obdo_to_ioobj(oa, ioobj);
906 ioobj->ioo_bufcnt = niocount;
908 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
910 capa_cpy(capa, ocapa);
911 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
914 LASSERT (page_count > 0);
915 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
916 struct brw_page *pg = pga[i];
917 struct brw_page *pg_prev = pga[i - 1];
919 LASSERT(pg->count > 0);
920 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
921 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
924 LASSERTF(i == 0 || pg->off > pg_prev->off,
925 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
926 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
928 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
929 pg_prev->pg, page_private(pg_prev->pg),
930 pg_prev->pg->index, pg_prev->off);
932 LASSERTF(i == 0 || pg->off > pg_prev->off,
933 "i %d p_c %u\n", i, page_count);
935 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
936 (pg->flag & OBD_BRW_SRVLOCK));
938 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
940 requested_nob += pg->count;
942 if (i > 0 && can_merge_pages(pg_prev, pg)) {
944 niobuf->len += pg->count;
946 niobuf->offset = pg->off;
947 niobuf->len = pg->count;
948 niobuf->flags = pg->flag;
952 LASSERT((void *)(niobuf - niocount) ==
953 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
954 niocount * sizeof(*niobuf)));
955 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
957 /* size[REQ_REC_OFF] still sizeof (*body) */
958 if (opc == OST_WRITE) {
959 if (unlikely(cli->cl_checksum)) {
960 body->oa.o_valid |= OBD_MD_FLCKSUM;
961 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
963 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
965 /* save this in 'oa', too, for later checking */
966 oa->o_valid |= OBD_MD_FLCKSUM;
967 oa->o_cksum = body->oa.o_cksum;
969 /* 1 RC per niobuf */
970 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
971 ptlrpc_req_set_repsize(req, 3, size);
973 if (unlikely(cli->cl_checksum))
974 body->oa.o_valid |= OBD_MD_FLCKSUM;
975 /* 1 RC for the whole I/O */
976 ptlrpc_req_set_repsize(req, 2, size);
979 *niocountp = niocount;
980 *requested_nobp = requested_nob;
985 ptlrpc_req_finished (req);
989 static void check_write_csum(__u32 cli, __u32 srv, int requested_nob,
990 obd_count page_count, struct brw_page **pga)
995 CDEBUG(D_PAGE, "checksum %x confirmed\n", cli);
999 new_csum = osc_checksum_bulk(requested_nob, page_count, pga);
1001 if (new_csum == srv) {
1002 CERROR("BAD CHECKSUM (WRITE): pages were mutated on the client"
1003 "after we checksummed them (original client csum:"
1004 " %x; server csum: %x; client csum now: %x)\n",
1005 cli, srv, new_csum);
1009 if (new_csum == cli) {
1010 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit "
1011 "(original client csum: %x; server csum: %x; client "
1012 "csum now: %x)\n", cli, srv, new_csum);
1016 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit, and the "
1017 "current page contents don't match the originals OR what the "
1018 "server received (original client csum: %x; server csum: %x; "
1019 "client csum now: %x)\n", cli, srv, new_csum);
1022 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
1023 int requested_nob, int niocount,
1024 obd_count page_count, struct brw_page **pga,
1027 const lnet_process_id_t *peer =
1028 &req->rq_import->imp_connection->c_peer;
1029 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
1030 struct ost_body *body;
1031 __u32 client_cksum = 0;
1034 if (rc < 0 && rc != -EDQUOT)
1037 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1038 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1039 lustre_swab_ost_body);
1041 CERROR ("Can't unpack body\n");
1045 /* set/clear over quota flag for a uid/gid */
1046 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1047 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1048 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1049 body->oa.o_gid, body->oa.o_valid,
1055 if (unlikely(oa->o_valid & OBD_MD_FLCKSUM))
1056 client_cksum = oa->o_cksum; /* save for later */
1058 osc_update_grant(cli, body);
1059 memcpy(oa, &body->oa, sizeof(*oa));
1061 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1063 CERROR ("Unexpected +ve rc %d\n", rc);
1066 LASSERT (req->rq_bulk->bd_nob == requested_nob);
1068 if (unlikely((oa->o_valid & OBD_MD_FLCKSUM) &&
1070 check_write_csum(client_cksum, oa->o_cksum,
1071 requested_nob, page_count, pga);
1074 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
1076 RETURN(check_write_rcs(req, requested_nob, niocount,
1080 /* The rest of this function executes only for OST_READs */
1081 if (rc > requested_nob) {
1082 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
1086 if (rc != req->rq_bulk->bd_nob_transferred) {
1087 CERROR ("Unexpected rc %d (%d transferred)\n",
1088 rc, req->rq_bulk->bd_nob_transferred);
1092 if (rc < requested_nob)
1093 handle_short_read(rc, page_count, pga);
1095 if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) {
1096 static int cksum_counter;
1097 __u32 cksum = osc_checksum_bulk(rc, page_count, pga);
1098 __u32 server_cksum = oa->o_cksum;
1100 if (server_cksum == ~0 && rc > 0) {
1101 CERROR("Protocol error: server %s set the 'checksum' "
1102 "bit, but didn't send a checksum. Not fatal, "
1103 "but please tell CFS.\n",
1104 libcfs_nid2str(peer->nid));
1110 if (server_cksum != cksum) {
1111 CERROR("Bad checksum from %s: server %x != client %x\n",
1112 libcfs_nid2str(peer->nid), server_cksum, cksum);
1114 oa->o_cksum = cksum;
1115 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1116 CWARN("Checksum %u from %s OK: %x\n",
1117 cksum_counter, libcfs_nid2str(peer->nid), cksum);
1119 CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum);
1120 } else if (unlikely(client_cksum)) {
1121 static int cksum_missed;
1124 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1125 CERROR("Checksum %u requested from %s but not sent\n",
1126 cksum_missed, libcfs_nid2str(peer->nid));
1129 sptlrpc_cli_unwrap_bulk_read(req, rc, page_count, pga);
1134 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1135 struct lov_stripe_md *lsm,
1136 obd_count page_count, struct brw_page **pga,
1137 struct obd_capa *ocapa)
1141 struct ptlrpc_request *req;
1146 rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1147 page_count, pga, &requested_nob, &niocount,
1152 rc = ptlrpc_queue_wait(req);
1154 if (rc == -ETIMEDOUT && req->rq_resend) {
1155 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1156 ptlrpc_req_finished(req);
1160 rc = osc_brw_fini_request(req, oa, requested_nob, niocount,
1161 page_count, pga, rc);
1163 ptlrpc_req_finished(req);
1167 static int brw_interpret(struct ptlrpc_request *req,
1168 struct osc_brw_async_args *aa, int rc)
1170 struct obdo *oa = aa->aa_oa;
1171 int requested_nob = aa->aa_requested_nob;
1172 int niocount = aa->aa_nio_count;
1173 obd_count page_count = aa->aa_page_count;
1174 struct brw_page **pga = aa->aa_ppga;
1177 rc = osc_brw_fini_request(req, oa, requested_nob, niocount,
1178 page_count, pga, rc);
1179 osc_release_ppga(pga, page_count);
1183 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1184 struct lov_stripe_md *lsm, obd_count page_count,
1185 struct brw_page **pga, struct ptlrpc_request_set *set,
1186 struct obd_capa *ocapa)
1188 struct ptlrpc_request *req;
1191 struct osc_brw_async_args *aa;
1195 /* Consume write credits even if doing a sync write -
1196 * otherwise we may run out of space on OST due to grant. */
1197 spin_lock(&exp->exp_obd->u.cli.cl_loi_list_lock);
1198 for (nio_count = 0; nio_count < page_count; nio_count++) {
1199 if (exp->exp_obd->u.cli.cl_avail_grant >= PAGE_SIZE) {
1200 exp->exp_obd->u.cli.cl_avail_grant -= PAGE_SIZE;
1201 pga[nio_count]->flag |= OBD_BRW_FROM_GRANT;
1204 spin_unlock(&exp->exp_obd->u.cli.cl_loi_list_lock);
1206 rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1207 page_count, pga, &requested_nob, &nio_count,
1211 LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1212 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1214 aa->aa_requested_nob = requested_nob;
1215 aa->aa_nio_count = nio_count;
1216 aa->aa_page_count = page_count;
1219 req->rq_interpret_reply = brw_interpret;
1220 ptlrpc_set_add_req(set, req);
1226 * ugh, we want disk allocation on the target to happen in offset order. we'll
1227 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1228 * fine for our small page arrays and doesn't require allocation. its an
1229 * insertion sort that swaps elements that are strides apart, shrinking the
1230 * stride down until its '1' and the array is sorted.
1232 static void sort_brw_pages(struct brw_page **array, int num)
1235 struct brw_page *tmp;
1239 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1244 for (i = stride ; i < num ; i++) {
1247 while (j >= stride && array[j - stride]->off > tmp->off) {
1248 array[j] = array[j - stride];
1253 } while (stride > 1);
1256 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1262 LASSERT (pages > 0);
1263 offset = pg[i]->off & (CFS_PAGE_SIZE - 1);
1267 if (pages == 0) /* that's all */
1270 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1271 return count; /* doesn't end on page boundary */
1274 offset = pg[i]->off & (CFS_PAGE_SIZE - 1);
1275 if (offset != 0) /* doesn't start on page boundary */
1282 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1284 struct brw_page **ppga;
1287 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1291 for (i = 0; i < count; i++)
1296 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1298 LASSERT(ppga != NULL);
1299 OBD_FREE(ppga, sizeof(*ppga) * count);
1302 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1303 obd_count page_count, struct brw_page *pga,
1304 struct obd_trans_info *oti)
1306 struct obdo *saved_oa = NULL;
1307 struct brw_page **ppga, **orig;
1308 struct obd_import *imp = class_exp2cliimp(exp);
1309 struct client_obd *cli = &imp->imp_obd->u.cli;
1310 int rc, page_count_orig;
1313 if (cmd & OBD_BRW_CHECK) {
1314 /* The caller just wants to know if there's a chance that this
1315 * I/O can succeed */
1317 if (imp == NULL || imp->imp_invalid)
1324 orig = ppga = osc_build_ppga(pga, page_count);
1327 page_count_orig = page_count;
1329 sort_brw_pages(ppga, page_count);
1330 while (page_count) {
1331 obd_count pages_per_brw;
1333 if (page_count > cli->cl_max_pages_per_rpc)
1334 pages_per_brw = cli->cl_max_pages_per_rpc;
1336 pages_per_brw = page_count;
1338 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1340 if (saved_oa != NULL) {
1341 /* restore previously saved oa */
1342 *oinfo->oi_oa = *saved_oa;
1343 } else if (page_count > pages_per_brw) {
1344 /* save a copy of oa (brw will clobber it) */
1345 saved_oa = obdo_alloc();
1346 if (saved_oa == NULL)
1347 GOTO(out, rc = -ENOMEM);
1348 *saved_oa = *oinfo->oi_oa;
1351 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1352 pages_per_brw, ppga, oinfo->oi_capa);
1357 page_count -= pages_per_brw;
1358 ppga += pages_per_brw;
1362 osc_release_ppga(orig, page_count_orig);
1364 if (saved_oa != NULL)
1365 obdo_free(saved_oa);
1370 static int osc_brw_async(int cmd, struct obd_export *exp,
1371 struct obd_info *oinfo, obd_count page_count,
1372 struct brw_page *pga, struct obd_trans_info *oti,
1373 struct ptlrpc_request_set *set)
1375 struct brw_page **ppga, **orig, **copy;
1376 struct obd_import *imp = class_exp2cliimp(exp);
1377 struct client_obd *cli = &imp->imp_obd->u.cli;
1378 int page_count_orig;
1382 if (cmd & OBD_BRW_CHECK) {
1383 /* The caller just wants to know if there's a chance that this
1384 * I/O can succeed */
1386 if (imp == NULL || imp->imp_invalid)
1391 orig = ppga = osc_build_ppga(pga, page_count);
1394 page_count_orig = page_count;
1396 sort_brw_pages(ppga, page_count);
1397 while (page_count) {
1398 obd_count pages_per_brw;
1400 if (page_count > cli->cl_max_pages_per_rpc)
1401 pages_per_brw = cli->cl_max_pages_per_rpc;
1403 pages_per_brw = page_count;
1405 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1407 /* use ppga only if single RPC is going to fly */
1408 if (pages_per_brw != page_count_orig || ppga != orig) {
1409 int size = sizeof(struct brw_page *) * pages_per_brw;
1410 OBD_ALLOC(copy, size);
1412 GOTO(out, rc = -ENOMEM);
1413 memcpy(copy, ppga, size);
1417 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1418 pages_per_brw, copy, set, oinfo->oi_capa);
1424 /* we passed it to async_internal() which is
1425 * now responsible for releasing memory */
1429 page_count -= pages_per_brw;
1430 ppga += pages_per_brw;
1434 osc_release_ppga(orig, page_count_orig);
1438 static void osc_check_rpcs(struct client_obd *cli);
1439 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1442 /* This maintains the lists of pending pages to read/write for a given object
1443 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1444 * to quickly find objects that are ready to send an RPC. */
1445 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1451 if (lop->lop_num_pending == 0)
1454 /* if we have an invalid import we want to drain the queued pages
1455 * by forcing them through rpcs that immediately fail and complete
1456 * the pages. recovery relies on this to empty the queued pages
1457 * before canceling the locks and evicting down the llite pages */
1458 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1461 /* stream rpcs in queue order as long as as there is an urgent page
1462 * queued. this is our cheap solution for good batching in the case
1463 * where writepage marks some random page in the middle of the file
1464 * as urgent because of, say, memory pressure */
1465 if (!list_empty(&lop->lop_urgent))
1468 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1469 optimal = cli->cl_max_pages_per_rpc;
1470 if (cmd & OBD_BRW_WRITE) {
1471 /* trigger a write rpc stream as long as there are dirtiers
1472 * waiting for space. as they're waiting, they're not going to
1473 * create more pages to coallesce with what's waiting.. */
1474 if (!list_empty(&cli->cl_cache_waiters))
1477 /* +16 to avoid triggering rpcs that would want to include pages
1478 * that are being queued but which can't be made ready until
1479 * the queuer finishes with the page. this is a wart for
1480 * llite::commit_write() */
1483 if (lop->lop_num_pending >= optimal)
1489 static void on_list(struct list_head *item, struct list_head *list,
1492 if (list_empty(item) && should_be_on)
1493 list_add_tail(item, list);
1494 else if (!list_empty(item) && !should_be_on)
1495 list_del_init(item);
1498 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1499 * can find pages to build into rpcs quickly */
1500 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1502 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1503 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1504 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1506 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1507 loi->loi_write_lop.lop_num_pending);
1509 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1510 loi->loi_read_lop.lop_num_pending);
1513 static void lop_update_pending(struct client_obd *cli,
1514 struct loi_oap_pages *lop, int cmd, int delta)
1516 lop->lop_num_pending += delta;
1517 if (cmd & OBD_BRW_WRITE)
1518 cli->cl_pending_w_pages += delta;
1520 cli->cl_pending_r_pages += delta;
1523 /* this is called when a sync waiter receives an interruption. Its job is to
1524 * get the caller woken as soon as possible. If its page hasn't been put in an
1525 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1526 * desiring interruption which will forcefully complete the rpc once the rpc
1528 static void osc_occ_interrupted(struct oig_callback_context *occ)
1530 struct osc_async_page *oap;
1531 struct loi_oap_pages *lop;
1532 struct lov_oinfo *loi;
1535 /* XXX member_of() */
1536 oap = list_entry(occ, struct osc_async_page, oap_occ);
1538 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1540 oap->oap_interrupted = 1;
1542 /* ok, it's been put in an rpc. */
1543 if (oap->oap_request != NULL) {
1544 ptlrpc_mark_interrupted(oap->oap_request);
1545 ptlrpcd_wake(oap->oap_request);
1549 /* we don't get interruption callbacks until osc_trigger_group_io()
1550 * has been called and put the sync oaps in the pending/urgent lists.*/
1551 if (!list_empty(&oap->oap_pending_item)) {
1552 list_del_init(&oap->oap_pending_item);
1553 list_del_init(&oap->oap_urgent_item);
1556 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1557 &loi->loi_write_lop : &loi->loi_read_lop;
1558 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1559 loi_list_maint(oap->oap_cli, oap->oap_loi);
1561 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1562 oap->oap_oig = NULL;
1566 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1569 /* this is trying to propogate async writeback errors back up to the
1570 * application. As an async write fails we record the error code for later if
1571 * the app does an fsync. As long as errors persist we force future rpcs to be
1572 * sync so that the app can get a sync error and break the cycle of queueing
1573 * pages for which writeback will fail. */
1574 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1581 ar->ar_force_sync = 1;
1582 ar->ar_min_xid = ptlrpc_sample_next_xid();
1587 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1588 ar->ar_force_sync = 0;
1591 static void osc_oap_to_pending(struct osc_async_page *oap)
1593 struct loi_oap_pages *lop;
1595 if (oap->oap_cmd & OBD_BRW_WRITE)
1596 lop = &oap->oap_loi->loi_write_lop;
1598 lop = &oap->oap_loi->loi_read_lop;
1600 if (oap->oap_async_flags & ASYNC_URGENT)
1601 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1602 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1603 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1606 /* this must be called holding the loi list lock to give coverage to exit_cache,
1607 * async_flag maintenance, and oap_request */
1608 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1609 struct osc_async_page *oap, int sent, int rc)
1612 oap->oap_async_flags = 0;
1613 oap->oap_interrupted = 0;
1615 if (oap->oap_cmd & OBD_BRW_WRITE) {
1616 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1617 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1620 if (oap->oap_request != NULL) {
1621 ptlrpc_req_finished(oap->oap_request);
1622 oap->oap_request = NULL;
1625 if (rc == 0 && oa != NULL) {
1626 if (oa->o_valid & OBD_MD_FLBLOCKS)
1627 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1628 if (oa->o_valid & OBD_MD_FLMTIME)
1629 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1630 if (oa->o_valid & OBD_MD_FLATIME)
1631 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1632 if (oa->o_valid & OBD_MD_FLCTIME)
1633 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1637 osc_exit_cache(cli, oap, sent);
1638 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1639 oap->oap_oig = NULL;
1644 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1645 oap->oap_cmd, oa, rc);
1647 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1648 * I/O on the page could start, but OSC calls it under lock
1649 * and thus we can add oap back to pending safely */
1651 /* upper layer wants to leave the page on pending queue */
1652 osc_oap_to_pending(oap);
1654 osc_exit_cache(cli, oap, sent);
1658 static int brw_interpret_oap(struct ptlrpc_request *req,
1659 struct osc_brw_async_args *aa, int rc)
1661 struct osc_async_page *oap;
1662 struct client_obd *cli;
1663 struct list_head *pos, *n;
1666 rc = osc_brw_fini_request(req, aa->aa_oa, aa->aa_requested_nob,
1667 aa->aa_nio_count, aa->aa_page_count,
1670 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1674 client_obd_list_lock(&cli->cl_loi_list_lock);
1676 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1677 * is called so we know whether to go to sync BRWs or wait for more
1678 * RPCs to complete */
1679 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1680 cli->cl_w_in_flight--;
1682 cli->cl_r_in_flight--;
1684 /* the caller may re-use the oap after the completion call so
1685 * we need to clean it up a little */
1686 list_for_each_safe(pos, n, &aa->aa_oaps) {
1687 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1689 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1690 //oap->oap_page, oap->oap_page->index, oap);
1692 list_del_init(&oap->oap_rpc_item);
1693 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1696 osc_wake_cache_waiters(cli);
1697 osc_check_rpcs(cli);
1699 client_obd_list_unlock(&cli->cl_loi_list_lock);
1701 obdo_free(aa->aa_oa);
1702 OBD_FREE(aa->aa_ppga, aa->aa_page_count * sizeof(struct brw_page *));
1707 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1708 struct list_head *rpc_list,
1709 int page_count, int cmd)
1711 struct ptlrpc_request *req;
1712 struct brw_page **pga = NULL;
1713 int requested_nob, nio_count;
1714 struct osc_brw_async_args *aa;
1715 struct obdo *oa = NULL;
1716 struct obd_async_page_ops *ops = NULL;
1717 void *caller_data = NULL;
1718 struct list_head *pos;
1719 struct obd_capa *ocapa;
1723 LASSERT(!list_empty(rpc_list));
1725 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1727 RETURN(ERR_PTR(-ENOMEM));
1731 GOTO(out, req = ERR_PTR(-ENOMEM));
1734 list_for_each(pos, rpc_list) {
1735 struct osc_async_page *oap;
1737 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1739 ops = oap->oap_caller_ops;
1740 caller_data = oap->oap_caller_data;
1742 pga[i] = &oap->oap_brw_page;
1743 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1744 /*pga[i]->pg = oap->oap_page;
1745 pga[i]->count = oap->oap_count;
1746 pga[i]->flag = oap->oap_brw_flags;*/
1747 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1748 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1752 /* always get the data for the obdo for the rpc */
1753 LASSERT(ops != NULL);
1754 ops->ap_fill_obdo(caller_data, cmd, oa);
1755 ocapa = ops->ap_lookup_capa(caller_data, cmd);
1757 sort_brw_pages(pga, page_count);
1758 rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1759 pga, &requested_nob, &nio_count, &req, ocapa);
1762 CERROR("prep_req failed: %d\n", rc);
1763 GOTO(out, req = ERR_PTR(rc));
1766 /* Need to update the timestamps after the request is built in case
1767 * we race with setattr (locally or in queue at OST). If OST gets
1768 * later setattr before earlier BRW (as determined by the request xid),
1769 * the OST will not use BRW timestamps. Sadly, there is no obvious
1770 * way to do this in a single call. bug 10150 */
1771 ops->ap_update_obdo(caller_data, cmd, oa,
1772 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1774 LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1775 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1777 aa->aa_requested_nob = requested_nob;
1778 aa->aa_nio_count = nio_count;
1779 aa->aa_page_count = page_count;
1788 OBD_FREE(pga, sizeof(*pga) * page_count);
1793 /* the loi lock is held across this function but it's allowed to release
1794 * and reacquire it during its work */
1795 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1796 int cmd, struct loi_oap_pages *lop)
1798 struct ptlrpc_request *req;
1799 obd_count page_count = 0;
1800 struct list_head *tmp, *pos;
1801 struct osc_async_page *oap = NULL;
1802 struct osc_brw_async_args *aa;
1803 struct obd_async_page_ops *ops;
1804 CFS_LIST_HEAD(rpc_list);
1805 unsigned int ending_offset;
1806 unsigned starting_offset = 0;
1809 /* first we find the pages we're allowed to work with */
1810 list_for_each_safe(pos, tmp, &lop->lop_pending) {
1811 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1812 ops = oap->oap_caller_ops;
1814 LASSERT(oap->oap_magic == OAP_MAGIC);
1816 /* in llite being 'ready' equates to the page being locked
1817 * until completion unlocks it. commit_write submits a page
1818 * as not ready because its unlock will happen unconditionally
1819 * as the call returns. if we race with commit_write giving
1820 * us that page we dont' want to create a hole in the page
1821 * stream, so we stop and leave the rpc to be fired by
1822 * another dirtier or kupdated interval (the not ready page
1823 * will still be on the dirty list). we could call in
1824 * at the end of ll_file_write to process the queue again. */
1825 if (!(oap->oap_async_flags & ASYNC_READY)) {
1826 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1828 CDEBUG(D_INODE, "oap %p page %p returned %d "
1829 "instead of ready\n", oap,
1833 /* llite is telling us that the page is still
1834 * in commit_write and that we should try
1835 * and put it in an rpc again later. we
1836 * break out of the loop so we don't create
1837 * a hole in the sequence of pages in the rpc
1842 /* the io isn't needed.. tell the checks
1843 * below to complete the rpc with EINTR */
1844 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1845 oap->oap_count = -EINTR;
1848 oap->oap_async_flags |= ASYNC_READY;
1851 LASSERTF(0, "oap %p page %p returned %d "
1852 "from make_ready\n", oap,
1860 * Page submitted for IO has to be locked. Either by
1861 * ->ap_make_ready() or by higher layers.
1863 * XXX nikita: this assertion should be adjusted when lustre
1864 * starts using PG_writeback for pages being written out.
1866 #if defined(__KERNEL__) && defined(__LINUX__)
1867 LASSERT(PageLocked(oap->oap_page));
1869 /* If there is a gap at the start of this page, it can't merge
1870 * with any previous page, so we'll hand the network a
1871 * "fragmented" page array that it can't transfer in 1 RDMA */
1872 if (page_count != 0 && oap->oap_page_off != 0)
1875 /* take the page out of our book-keeping */
1876 list_del_init(&oap->oap_pending_item);
1877 lop_update_pending(cli, lop, cmd, -1);
1878 list_del_init(&oap->oap_urgent_item);
1880 if (page_count == 0)
1881 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1882 (PTLRPC_MAX_BRW_SIZE - 1);
1884 /* ask the caller for the size of the io as the rpc leaves. */
1885 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1887 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1888 if (oap->oap_count <= 0) {
1889 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1891 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1895 /* now put the page back in our accounting */
1896 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1897 if (++page_count >= cli->cl_max_pages_per_rpc)
1900 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
1901 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
1902 * have the same alignment as the initial writes that allocated
1903 * extents on the server. */
1904 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
1905 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
1906 if (ending_offset == 0)
1909 /* If there is a gap at the end of this page, it can't merge
1910 * with any subsequent pages, so we'll hand the network a
1911 * "fragmented" page array that it can't transfer in 1 RDMA */
1912 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
1916 osc_wake_cache_waiters(cli);
1918 if (page_count == 0)
1921 loi_list_maint(cli, loi);
1923 client_obd_list_unlock(&cli->cl_loi_list_lock);
1925 req = osc_build_req(cli, &rpc_list, page_count, cmd);
1927 /* this should happen rarely and is pretty bad, it makes the
1928 * pending list not follow the dirty order */
1929 client_obd_list_lock(&cli->cl_loi_list_lock);
1930 list_for_each_safe(pos, tmp, &rpc_list) {
1931 oap = list_entry(pos, struct osc_async_page,
1933 list_del_init(&oap->oap_rpc_item);
1935 /* queued sync pages can be torn down while the pages
1936 * were between the pending list and the rpc */
1937 if (oap->oap_interrupted) {
1938 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1939 osc_ap_completion(cli, NULL, oap, 0,
1943 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
1945 loi_list_maint(cli, loi);
1946 RETURN(PTR_ERR(req));
1949 LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1950 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1951 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1952 list_splice(&rpc_list, &aa->aa_oaps);
1953 CFS_INIT_LIST_HEAD(&rpc_list);
1955 if (cmd == OBD_BRW_READ) {
1956 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1957 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1958 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1959 starting_offset/CFS_PAGE_SIZE + 1);
1961 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1962 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1963 cli->cl_w_in_flight);
1964 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1965 starting_offset/CFS_PAGE_SIZE + 1);
1968 client_obd_list_lock(&cli->cl_loi_list_lock);
1970 if (cmd == OBD_BRW_READ)
1971 cli->cl_r_in_flight++;
1973 cli->cl_w_in_flight++;
1975 /* queued sync pages can be torn down while the pages
1976 * were between the pending list and the rpc */
1977 list_for_each(pos, &aa->aa_oaps) {
1978 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1979 if (oap->oap_interrupted) {
1980 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1982 ptlrpc_mark_interrupted(req);
1987 CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %dr/%dw in flight\n",
1988 req, page_count, aa, cli->cl_r_in_flight,
1989 cli->cl_w_in_flight);
1991 oap->oap_request = ptlrpc_request_addref(req);
1992 req->rq_interpret_reply = brw_interpret_oap;
1993 ptlrpcd_add_req(req);
1997 #define LOI_DEBUG(LOI, STR, args...) \
1998 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
1999 !list_empty(&(LOI)->loi_cli_item), \
2000 (LOI)->loi_write_lop.lop_num_pending, \
2001 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2002 (LOI)->loi_read_lop.lop_num_pending, \
2003 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2006 /* This is called by osc_check_rpcs() to find which objects have pages that
2007 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2008 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2011 /* first return all objects which we already know to have
2012 * pages ready to be stuffed into rpcs */
2013 if (!list_empty(&cli->cl_loi_ready_list))
2014 RETURN(list_entry(cli->cl_loi_ready_list.next,
2015 struct lov_oinfo, loi_cli_item));
2017 /* then if we have cache waiters, return all objects with queued
2018 * writes. This is especially important when many small files
2019 * have filled up the cache and not been fired into rpcs because
2020 * they don't pass the nr_pending/object threshhold */
2021 if (!list_empty(&cli->cl_cache_waiters) &&
2022 !list_empty(&cli->cl_loi_write_list))
2023 RETURN(list_entry(cli->cl_loi_write_list.next,
2024 struct lov_oinfo, loi_write_item));
2026 /* then return all queued objects when we have an invalid import
2027 * so that they get flushed */
2028 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2029 if (!list_empty(&cli->cl_loi_write_list))
2030 RETURN(list_entry(cli->cl_loi_write_list.next,
2031 struct lov_oinfo, loi_write_item));
2032 if (!list_empty(&cli->cl_loi_read_list))
2033 RETURN(list_entry(cli->cl_loi_read_list.next,
2034 struct lov_oinfo, loi_read_item));
2039 /* called with the loi list lock held */
2040 static void osc_check_rpcs(struct client_obd *cli)
2042 struct lov_oinfo *loi;
2043 int rc = 0, race_counter = 0;
2046 while ((loi = osc_next_loi(cli)) != NULL) {
2047 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2049 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2052 /* attempt some read/write balancing by alternating between
2053 * reads and writes in an object. The makes_rpc checks here
2054 * would be redundant if we were getting read/write work items
2055 * instead of objects. we don't want send_oap_rpc to drain a
2056 * partial read pending queue when we're given this object to
2057 * do io on writes while there are cache waiters */
2058 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2059 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2060 &loi->loi_write_lop);
2068 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2069 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2070 &loi->loi_read_lop);
2079 /* attempt some inter-object balancing by issueing rpcs
2080 * for each object in turn */
2081 if (!list_empty(&loi->loi_cli_item))
2082 list_del_init(&loi->loi_cli_item);
2083 if (!list_empty(&loi->loi_write_item))
2084 list_del_init(&loi->loi_write_item);
2085 if (!list_empty(&loi->loi_read_item))
2086 list_del_init(&loi->loi_read_item);
2088 loi_list_maint(cli, loi);
2090 /* send_oap_rpc fails with 0 when make_ready tells it to
2091 * back off. llite's make_ready does this when it tries
2092 * to lock a page queued for write that is already locked.
2093 * we want to try sending rpcs from many objects, but we
2094 * don't want to spin failing with 0. */
2095 if (race_counter == 10)
2101 /* we're trying to queue a page in the osc so we're subject to the
2102 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2103 * If the osc's queued pages are already at that limit, then we want to sleep
2104 * until there is space in the osc's queue for us. We also may be waiting for
2105 * write credits from the OST if there are RPCs in flight that may return some
2106 * before we fall back to sync writes.
2108 * We need this know our allocation was granted in the presence of signals */
2109 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2113 client_obd_list_lock(&cli->cl_loi_list_lock);
2114 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2115 client_obd_list_unlock(&cli->cl_loi_list_lock);
2119 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2120 * grant or cache space. */
2121 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2122 struct osc_async_page *oap)
2124 struct osc_cache_waiter ocw;
2125 struct l_wait_info lwi = { 0 };
2128 CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
2129 cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
2130 cli->cl_avail_grant);
2132 /* force the caller to try sync io. this can jump the list
2133 * of queued writes and create a discontiguous rpc stream */
2134 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2135 loi->loi_ar.ar_force_sync)
2138 /* Hopefully normal case - cache space and write credits available */
2139 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2140 cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2141 /* account for ourselves */
2142 osc_consume_write_grant(cli, oap);
2146 /* Make sure that there are write rpcs in flight to wait for. This
2147 * is a little silly as this object may not have any pending but
2148 * other objects sure might. */
2149 if (cli->cl_w_in_flight) {
2150 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2151 cfs_waitq_init(&ocw.ocw_waitq);
2155 loi_list_maint(cli, loi);
2156 osc_check_rpcs(cli);
2157 client_obd_list_unlock(&cli->cl_loi_list_lock);
2159 CDEBUG(D_CACHE, "sleeping for cache space\n");
2160 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2162 client_obd_list_lock(&cli->cl_loi_list_lock);
2163 if (!list_empty(&ocw.ocw_entry)) {
2164 list_del(&ocw.ocw_entry);
2173 /* the companion to enter_cache, called when an oap is no longer part of the
2174 * dirty accounting.. so writeback completes or truncate happens before writing
2175 * starts. must be called with the loi lock held. */
2176 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
2179 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
2182 if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
2187 oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
2188 cli->cl_dirty -= CFS_PAGE_SIZE;
2190 cli->cl_lost_grant += CFS_PAGE_SIZE;
2191 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
2192 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
2193 } else if (CFS_PAGE_SIZE != blocksize && oap->oap_count != CFS_PAGE_SIZE) {
2194 /* For short writes we shouldn't count parts of pages that
2195 * span a whole block on the OST side, or our accounting goes
2196 * wrong. Should match the code in filter_grant_check. */
2197 int offset = (oap->oap_obj_off +oap->oap_page_off) & ~CFS_PAGE_MASK;
2198 int count = oap->oap_count + (offset & (blocksize - 1));
2199 int end = (offset + oap->oap_count) & (blocksize - 1);
2201 count += blocksize - end;
2203 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
2204 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
2205 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
2206 cli->cl_avail_grant, cli->cl_dirty);
2212 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2213 struct lov_oinfo *loi, cfs_page_t *page,
2214 obd_off offset, struct obd_async_page_ops *ops,
2215 void *data, void **res)
2217 struct osc_async_page *oap;
2221 return size_round(sizeof(*oap));
2224 oap->oap_magic = OAP_MAGIC;
2225 oap->oap_cli = &exp->exp_obd->u.cli;
2228 oap->oap_caller_ops = ops;
2229 oap->oap_caller_data = data;
2231 oap->oap_page = page;
2232 oap->oap_obj_off = offset;
2234 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2235 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2236 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2238 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2240 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2244 struct osc_async_page *oap_from_cookie(void *cookie)
2246 struct osc_async_page *oap = cookie;
2247 if (oap->oap_magic != OAP_MAGIC)
2248 return ERR_PTR(-EINVAL);
2252 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2253 struct lov_oinfo *loi, void *cookie,
2254 int cmd, obd_off off, int count,
2255 obd_flag brw_flags, enum async_flags async_flags)
2257 struct client_obd *cli = &exp->exp_obd->u.cli;
2258 struct osc_async_page *oap;
2262 oap = oap_from_cookie(cookie);
2264 RETURN(PTR_ERR(oap));
2266 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2269 if (!list_empty(&oap->oap_pending_item) ||
2270 !list_empty(&oap->oap_urgent_item) ||
2271 !list_empty(&oap->oap_rpc_item))
2274 /* check if the file's owner/group is over quota */
2275 #ifdef HAVE_QUOTA_SUPPORT
2276 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2277 struct obd_async_page_ops *ops;
2284 ops = oap->oap_caller_ops;
2285 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2286 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2297 loi = &lsm->lsm_oinfo[0];
2299 client_obd_list_lock(&cli->cl_loi_list_lock);
2302 oap->oap_page_off = off;
2303 oap->oap_count = count;
2304 oap->oap_brw_flags = brw_flags;
2305 oap->oap_async_flags = async_flags;
2307 if (cmd & OBD_BRW_WRITE) {
2308 rc = osc_enter_cache(cli, loi, oap);
2310 client_obd_list_unlock(&cli->cl_loi_list_lock);
2315 osc_oap_to_pending(oap);
2316 loi_list_maint(cli, loi);
2318 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2321 osc_check_rpcs(cli);
2322 client_obd_list_unlock(&cli->cl_loi_list_lock);
2327 /* aka (~was & now & flag), but this is more clear :) */
2328 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2330 static int osc_set_async_flags(struct obd_export *exp,
2331 struct lov_stripe_md *lsm,
2332 struct lov_oinfo *loi, void *cookie,
2333 obd_flag async_flags)
2335 struct client_obd *cli = &exp->exp_obd->u.cli;
2336 struct loi_oap_pages *lop;
2337 struct osc_async_page *oap;
2341 oap = oap_from_cookie(cookie);
2343 RETURN(PTR_ERR(oap));
2346 * bug 7311: OST-side locking is only supported for liblustre for now
2347 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2348 * implementation has to handle case where OST-locked page was picked
2349 * up by, e.g., ->writepage().
2351 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2352 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2355 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2359 loi = &lsm->lsm_oinfo[0];
2361 if (oap->oap_cmd & OBD_BRW_WRITE) {
2362 lop = &loi->loi_write_lop;
2364 lop = &loi->loi_read_lop;
2367 client_obd_list_lock(&cli->cl_loi_list_lock);
2369 if (list_empty(&oap->oap_pending_item))
2370 GOTO(out, rc = -EINVAL);
2372 if ((oap->oap_async_flags & async_flags) == async_flags)
2375 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2376 oap->oap_async_flags |= ASYNC_READY;
2378 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2379 if (list_empty(&oap->oap_rpc_item)) {
2380 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2381 loi_list_maint(cli, loi);
2385 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2386 oap->oap_async_flags);
2388 osc_check_rpcs(cli);
2389 client_obd_list_unlock(&cli->cl_loi_list_lock);
2393 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2394 struct lov_oinfo *loi,
2395 struct obd_io_group *oig, void *cookie,
2396 int cmd, obd_off off, int count,
2398 obd_flag async_flags)
2400 struct client_obd *cli = &exp->exp_obd->u.cli;
2401 struct osc_async_page *oap;
2402 struct loi_oap_pages *lop;
2406 oap = oap_from_cookie(cookie);
2408 RETURN(PTR_ERR(oap));
2410 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2413 if (!list_empty(&oap->oap_pending_item) ||
2414 !list_empty(&oap->oap_urgent_item) ||
2415 !list_empty(&oap->oap_rpc_item))
2419 loi = &lsm->lsm_oinfo[0];
2421 client_obd_list_lock(&cli->cl_loi_list_lock);
2424 oap->oap_page_off = off;
2425 oap->oap_count = count;
2426 oap->oap_brw_flags = brw_flags;
2427 oap->oap_async_flags = async_flags;
2429 if (cmd & OBD_BRW_WRITE)
2430 lop = &loi->loi_write_lop;
2432 lop = &loi->loi_read_lop;
2434 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2435 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2437 rc = oig_add_one(oig, &oap->oap_occ);
2440 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2441 oap, oap->oap_page, rc);
2443 client_obd_list_unlock(&cli->cl_loi_list_lock);
2448 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2449 struct loi_oap_pages *lop, int cmd)
2451 struct list_head *pos, *tmp;
2452 struct osc_async_page *oap;
2454 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2455 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2456 list_del(&oap->oap_pending_item);
2457 osc_oap_to_pending(oap);
2459 loi_list_maint(cli, loi);
2462 static int osc_trigger_group_io(struct obd_export *exp,
2463 struct lov_stripe_md *lsm,
2464 struct lov_oinfo *loi,
2465 struct obd_io_group *oig)
2467 struct client_obd *cli = &exp->exp_obd->u.cli;
2471 loi = &lsm->lsm_oinfo[0];
2473 client_obd_list_lock(&cli->cl_loi_list_lock);
2475 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2476 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2478 osc_check_rpcs(cli);
2479 client_obd_list_unlock(&cli->cl_loi_list_lock);
2484 static int osc_teardown_async_page(struct obd_export *exp,
2485 struct lov_stripe_md *lsm,
2486 struct lov_oinfo *loi, void *cookie)
2488 struct client_obd *cli = &exp->exp_obd->u.cli;
2489 struct loi_oap_pages *lop;
2490 struct osc_async_page *oap;
2494 oap = oap_from_cookie(cookie);
2496 RETURN(PTR_ERR(oap));
2499 loi = &lsm->lsm_oinfo[0];
2501 if (oap->oap_cmd & OBD_BRW_WRITE) {
2502 lop = &loi->loi_write_lop;
2504 lop = &loi->loi_read_lop;
2507 client_obd_list_lock(&cli->cl_loi_list_lock);
2509 if (!list_empty(&oap->oap_rpc_item))
2510 GOTO(out, rc = -EBUSY);
2512 osc_exit_cache(cli, oap, 0);
2513 osc_wake_cache_waiters(cli);
2515 if (!list_empty(&oap->oap_urgent_item)) {
2516 list_del_init(&oap->oap_urgent_item);
2517 oap->oap_async_flags &= ~ASYNC_URGENT;
2519 if (!list_empty(&oap->oap_pending_item)) {
2520 list_del_init(&oap->oap_pending_item);
2521 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2523 loi_list_maint(cli, loi);
2525 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2527 client_obd_list_unlock(&cli->cl_loi_list_lock);
2531 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2534 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2537 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2540 lock_res_and_lock(lock);
2543 /* Liang XXX: Darwin and Winnt checking should be added */
2544 if (lock->l_ast_data && lock->l_ast_data != data) {
2545 struct inode *new_inode = data;
2546 struct inode *old_inode = lock->l_ast_data;
2547 if (!(old_inode->i_state & I_FREEING))
2548 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2549 LASSERTF(old_inode->i_state & I_FREEING,
2550 "Found existing inode %p/%lu/%u state %lu in lock: "
2551 "setting data to %p/%lu/%u\n", old_inode,
2552 old_inode->i_ino, old_inode->i_generation,
2554 new_inode, new_inode->i_ino, new_inode->i_generation);
2558 lock->l_ast_data = data;
2559 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2560 unlock_res_and_lock(lock);
2561 LDLM_LOCK_PUT(lock);
2564 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2565 ldlm_iterator_t replace, void *data)
2567 struct ldlm_res_id res_id = { .name = {0} };
2568 struct obd_device *obd = class_exp2obd(exp);
2570 res_id.name[0] = lsm->lsm_object_id;
2571 res_id.name[2] = lsm->lsm_object_gr;
2573 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2577 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2583 /* The request was created before ldlm_cli_enqueue call. */
2584 if (rc == ELDLM_LOCK_ABORTED) {
2585 struct ldlm_reply *rep;
2587 /* swabbed by ldlm_cli_enqueue() */
2588 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2589 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2591 LASSERT(rep != NULL);
2592 if (rep->lock_policy_res1)
2593 rc = rep->lock_policy_res1;
2597 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2598 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2599 oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_size,
2600 oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_blocks,
2601 oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_mtime);
2604 /* Call the update callback. */
2605 rc = oinfo->oi_cb_up(oinfo, rc);
2609 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2610 struct osc_enqueue_args *aa, int rc)
2612 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2613 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2614 struct ldlm_lock *lock;
2616 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2618 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2620 /* Complete obtaining the lock procedure. */
2621 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2623 &aa->oa_ei->ei_flags,
2624 &lsm->lsm_oinfo->loi_lvb,
2625 sizeof(lsm->lsm_oinfo->loi_lvb),
2626 lustre_swab_ost_lvb,
2627 aa->oa_oi->oi_lockh, rc);
2629 /* Complete osc stuff. */
2630 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2632 /* Release the lock for async request. */
2633 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2634 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2636 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2637 aa->oa_oi->oi_lockh, req, aa);
2638 LDLM_LOCK_PUT(lock);
2642 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2643 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2644 * other synchronous requests, however keeping some locks and trying to obtain
2645 * others may take a considerable amount of time in a case of ost failure; and
2646 * when other sync requests do not get released lock from a client, the client
2647 * is excluded from the cluster -- such scenarious make the life difficult, so
2648 * release locks just after they are obtained. */
2649 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2650 struct obd_enqueue_info *einfo)
2652 struct ldlm_res_id res_id = { .name = {0} };
2653 struct obd_device *obd = exp->exp_obd;
2654 struct ldlm_reply *rep;
2655 struct ptlrpc_request *req = NULL;
2656 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2660 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2661 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2663 /* Filesystem lock extents are extended to page boundaries so that
2664 * dealing with the page cache is a little smoother. */
2665 oinfo->oi_policy.l_extent.start -=
2666 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2667 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2669 if (oinfo->oi_md->lsm_oinfo->loi_kms_valid == 0)
2672 /* Next, search for already existing extent locks that will cover us */
2673 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags, &res_id,
2674 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2677 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2680 /* I would like to be able to ASSERT here that rss <=
2681 * kms, but I can't, for reasons which are explained in
2685 /* We already have a lock, and it's referenced */
2686 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2688 /* For async requests, decref the lock. */
2689 if (einfo->ei_rqset)
2690 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2695 /* If we're trying to read, we also search for an existing PW lock. The
2696 * VFS and page cache already protect us locally, so lots of readers/
2697 * writers can share a single PW lock.
2699 * There are problems with conversion deadlocks, so instead of
2700 * converting a read lock to a write lock, we'll just enqueue a new
2703 * At some point we should cancel the read lock instead of making them
2704 * send us a blocking callback, but there are problems with canceling
2705 * locks out from other users right now, too. */
2707 if (einfo->ei_mode == LCK_PR) {
2708 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags,
2709 &res_id, einfo->ei_type, &oinfo->oi_policy,
2710 LCK_PW, oinfo->oi_lockh);
2712 /* FIXME: This is not incredibly elegant, but it might
2713 * be more elegant than adding another parameter to
2714 * lock_match. I want a second opinion. */
2715 /* addref the lock only if not async requests. */
2716 if (!einfo->ei_rqset)
2717 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2718 osc_set_data_with_check(oinfo->oi_lockh,
2721 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2722 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2730 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2731 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
2733 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2734 LDLM_ENQUEUE, 2, size, NULL);
2738 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2739 size[DLM_REPLY_REC_OFF] =
2740 sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb);
2741 ptlrpc_req_set_repsize(req, 3, size);
2744 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2745 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2747 rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2748 &oinfo->oi_policy, einfo->ei_mode,
2749 &einfo->ei_flags, einfo->ei_cb_bl,
2750 einfo->ei_cb_cp, einfo->ei_cb_gl,
2752 &oinfo->oi_md->lsm_oinfo->loi_lvb,
2753 sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb),
2754 lustre_swab_ost_lvb, oinfo->oi_lockh,
2755 einfo->ei_rqset ? 1 : 0);
2756 if (einfo->ei_rqset) {
2758 struct osc_enqueue_args *aa;
2759 LASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2760 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2765 req->rq_interpret_reply = osc_enqueue_interpret;
2766 ptlrpc_set_add_req(einfo->ei_rqset, req);
2767 } else if (intent) {
2768 ptlrpc_req_finished(req);
2773 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2775 ptlrpc_req_finished(req);
2780 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2781 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2782 int *flags, void *data, struct lustre_handle *lockh)
2784 struct ldlm_res_id res_id = { .name = {0} };
2785 struct obd_device *obd = exp->exp_obd;
2789 res_id.name[0] = lsm->lsm_object_id;
2790 res_id.name[2] = lsm->lsm_object_gr;
2792 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2794 /* Filesystem lock extents are extended to page boundaries so that
2795 * dealing with the page cache is a little smoother */
2796 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2797 policy->l_extent.end |= ~CFS_PAGE_MASK;
2799 /* Next, search for already existing extent locks that will cover us */
2800 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2801 policy, mode, lockh);
2803 //if (!(*flags & LDLM_FL_TEST_LOCK))
2804 osc_set_data_with_check(lockh, data, *flags);
2807 /* If we're trying to read, we also search for an existing PW lock. The
2808 * VFS and page cache already protect us locally, so lots of readers/
2809 * writers can share a single PW lock. */
2810 if (mode == LCK_PR) {
2811 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2812 policy, LCK_PW, lockh);
2813 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2814 /* FIXME: This is not incredibly elegant, but it might
2815 * be more elegant than adding another parameter to
2816 * lock_match. I want a second opinion. */
2817 osc_set_data_with_check(lockh, data, *flags);
2818 ldlm_lock_addref(lockh, LCK_PR);
2819 ldlm_lock_decref(lockh, LCK_PW);
2825 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2826 __u32 mode, struct lustre_handle *lockh)
2830 if (unlikely(mode == LCK_GROUP))
2831 ldlm_lock_decref_and_cancel(lockh, mode);
2833 ldlm_lock_decref(lockh, mode);
2838 static int osc_cancel_unused(struct obd_export *exp,
2839 struct lov_stripe_md *lsm,
2840 int flags, void *opaque)
2842 struct obd_device *obd = class_exp2obd(exp);
2843 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2846 res_id.name[0] = lsm->lsm_object_id;
2847 res_id.name[2] = lsm->lsm_object_gr;
2851 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags,
2855 static int osc_join_lru(struct obd_export *exp,
2856 struct lov_stripe_md *lsm, int join)
2858 struct obd_device *obd = class_exp2obd(exp);
2859 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2862 res_id.name[0] = lsm->lsm_object_id;
2863 res_id.name[2] = lsm->lsm_object_gr;
2867 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
2870 static int osc_statfs_interpret(struct ptlrpc_request *req,
2871 struct osc_async_args *aa, int rc)
2873 struct obd_statfs *msfs;
2879 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2880 lustre_swab_obd_statfs);
2882 CERROR("Can't unpack obd_statfs\n");
2883 GOTO(out, rc = -EPROTO);
2886 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2888 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2892 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2893 __u64 max_age, struct ptlrpc_request_set *rqset)
2895 struct ptlrpc_request *req;
2896 struct osc_async_args *aa;
2897 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2900 /* We could possibly pass max_age in the request (as an absolute
2901 * timestamp or a "seconds.usec ago") so the target can avoid doing
2902 * extra calls into the filesystem if that isn't necessary (e.g.
2903 * during mount that would help a bit). Having relative timestamps
2904 * is not so great if request processing is slow, while absolute
2905 * timestamps are not ideal because they need time synchronization. */
2906 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2907 OST_STATFS, 1, NULL, NULL);
2911 ptlrpc_req_set_repsize(req, 2, size);
2912 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2914 req->rq_interpret_reply = osc_statfs_interpret;
2915 LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
2916 aa = (struct osc_async_args *)&req->rq_async_args;
2919 ptlrpc_set_add_req(rqset, req);
2923 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2926 struct obd_statfs *msfs;
2927 struct ptlrpc_request *req;
2928 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2931 /* We could possibly pass max_age in the request (as an absolute
2932 * timestamp or a "seconds.usec ago") so the target can avoid doing
2933 * extra calls into the filesystem if that isn't necessary (e.g.
2934 * during mount that would help a bit). Having relative timestamps
2935 * is not so great if request processing is slow, while absolute
2936 * timestamps are not ideal because they need time synchronization. */
2937 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2938 OST_STATFS, 1, NULL, NULL);
2942 ptlrpc_req_set_repsize(req, 2, size);
2943 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2945 rc = ptlrpc_queue_wait(req);
2949 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2950 lustre_swab_obd_statfs);
2952 CERROR("Can't unpack obd_statfs\n");
2953 GOTO(out, rc = -EPROTO);
2956 memcpy(osfs, msfs, sizeof(*osfs));
2960 ptlrpc_req_finished(req);
2964 /* Retrieve object striping information.
2966 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2967 * the maximum number of OST indices which will fit in the user buffer.
2968 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2970 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2972 struct lov_user_md lum, *lumk;
2973 int rc = 0, lum_size;
2979 if (copy_from_user(&lum, lump, sizeof(lum)))
2982 if (lum.lmm_magic != LOV_USER_MAGIC)
2985 if (lum.lmm_stripe_count > 0) {
2986 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2987 OBD_ALLOC(lumk, lum_size);
2991 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2992 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
2994 lum_size = sizeof(lum);
2998 lumk->lmm_object_id = lsm->lsm_object_id;
2999 lumk->lmm_object_gr = lsm->lsm_object_gr;
3000 lumk->lmm_stripe_count = 1;
3002 if (copy_to_user(lump, lumk, lum_size))
3006 OBD_FREE(lumk, lum_size);
3012 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3013 void *karg, void *uarg)
3015 struct obd_device *obd = exp->exp_obd;
3016 struct obd_ioctl_data *data = karg;
3020 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3023 if (!try_module_get(THIS_MODULE)) {
3024 CERROR("Can't get module. Is it alive?");
3029 case OBD_IOC_LOV_GET_CONFIG: {
3031 struct lov_desc *desc;
3032 struct obd_uuid uuid;
3036 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3037 GOTO(out, err = -EINVAL);
3039 data = (struct obd_ioctl_data *)buf;
3041 if (sizeof(*desc) > data->ioc_inllen1) {
3042 obd_ioctl_freedata(buf, len);
3043 GOTO(out, err = -EINVAL);
3046 if (data->ioc_inllen2 < sizeof(uuid)) {
3047 obd_ioctl_freedata(buf, len);
3048 GOTO(out, err = -EINVAL);
3051 desc = (struct lov_desc *)data->ioc_inlbuf1;
3052 desc->ld_tgt_count = 1;
3053 desc->ld_active_tgt_count = 1;
3054 desc->ld_default_stripe_count = 1;
3055 desc->ld_default_stripe_size = 0;
3056 desc->ld_default_stripe_offset = 0;
3057 desc->ld_pattern = 0;
3058 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3060 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3062 err = copy_to_user((void *)uarg, buf, len);
3065 obd_ioctl_freedata(buf, len);
3068 case LL_IOC_LOV_SETSTRIPE:
3069 err = obd_alloc_memmd(exp, karg);
3073 case LL_IOC_LOV_GETSTRIPE:
3074 err = osc_getstripe(karg, uarg);
3076 case OBD_IOC_CLIENT_RECOVER:
3077 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3082 case IOC_OSC_SET_ACTIVE:
3083 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3086 case OBD_IOC_POLL_QUOTACHECK:
3087 err = lquota_poll_check(quota_interface, exp,
3088 (struct if_quotacheck *)karg);
3091 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3092 cmd, cfs_curproc_comm());
3093 GOTO(out, err = -ENOTTY);
3096 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3099 module_put(THIS_MODULE);
3104 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3105 void *key, __u32 *vallen, void *val)
3108 if (!vallen || !val)
3111 if (keylen > strlen("lock_to_stripe") &&
3112 strcmp(key, "lock_to_stripe") == 0) {
3113 __u32 *stripe = val;
3114 *vallen = sizeof(*stripe);
3117 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3118 struct ptlrpc_request *req;
3120 char *bufs[2] = { NULL, key };
3121 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3123 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3124 OST_GET_INFO, 2, size, bufs);
3128 size[REPLY_REC_OFF] = *vallen;
3129 ptlrpc_req_set_repsize(req, 2, size);
3130 rc = ptlrpc_queue_wait(req);
3134 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3135 lustre_swab_ost_last_id);
3136 if (reply == NULL) {
3137 CERROR("Can't unpack OST last ID\n");
3138 GOTO(out, rc = -EPROTO);
3140 *((obd_id *)val) = *reply;
3142 ptlrpc_req_finished(req);
3148 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3151 struct llog_ctxt *ctxt;
3152 struct obd_import *imp = req->rq_import;
3158 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3161 rc = llog_initiator_connect(ctxt);
3163 CERROR("cannot establish connection for "
3164 "ctxt %p: %d\n", ctxt, rc);
3167 imp->imp_server_timeout = 1;
3168 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3169 imp->imp_pingable = 1;
3174 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3175 void *key, obd_count vallen, void *val,
3176 struct ptlrpc_request_set *set)
3178 struct ptlrpc_request *req;
3179 struct obd_device *obd = exp->exp_obd;
3180 struct obd_import *imp = class_exp2cliimp(exp);
3181 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3182 char *bufs[3] = { NULL, key, val };
3185 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3187 if (KEY_IS(KEY_NEXT_ID)) {
3188 if (vallen != sizeof(obd_id))
3190 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3191 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3192 exp->exp_obd->obd_name,
3193 obd->u.cli.cl_oscc.oscc_next_id);
3198 if (KEY_IS("unlinked")) {
3199 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3200 spin_lock(&oscc->oscc_lock);
3201 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3202 spin_unlock(&oscc->oscc_lock);
3206 if (KEY_IS(KEY_INIT_RECOV)) {
3207 if (vallen != sizeof(int))
3209 imp->imp_initial_recov = *(int *)val;
3210 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3211 exp->exp_obd->obd_name,
3212 imp->imp_initial_recov);
3216 if (KEY_IS("checksum")) {
3217 if (vallen != sizeof(int))
3219 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3223 if (KEY_IS(KEY_FLUSH_CTX)) {
3224 sptlrpc_import_flush_my_ctx(imp);
3231 /* We pass all other commands directly to OST. Since nobody calls osc
3232 methods directly and everybody is supposed to go through LOV, we
3233 assume lov checked invalid values for us.
3234 The only recognised values so far are evict_by_nid and mds_conn.
3235 Even if something bad goes through, we'd get a -EINVAL from OST
3238 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3243 if (KEY_IS("mds_conn")) {
3244 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3246 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3247 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3248 LASSERT(oscc->oscc_oa.o_gr > 0);
3249 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3252 ptlrpc_req_set_repsize(req, 1, NULL);
3253 ptlrpc_set_add_req(set, req);
3254 ptlrpc_check_set(set);
3260 static struct llog_operations osc_size_repl_logops = {
3261 lop_cancel: llog_obd_repl_cancel
3264 static struct llog_operations osc_mds_ost_orig_logops;
3265 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3266 struct obd_device *tgt, int count,
3267 struct llog_catid *catid, struct obd_uuid *uuid)
3272 spin_lock(&obd->obd_dev_lock);
3273 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3274 osc_mds_ost_orig_logops = llog_lvfs_ops;
3275 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3276 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3277 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3278 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3280 spin_unlock(&obd->obd_dev_lock);
3282 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3283 &catid->lci_logid, &osc_mds_ost_orig_logops);
3285 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3289 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3290 &osc_size_repl_logops);
3292 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3295 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3296 obd->obd_name, tgt->obd_name, count, catid, rc);
3297 CERROR("logid "LPX64":0x%x\n",
3298 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3303 static int osc_llog_finish(struct obd_device *obd, int count)
3305 struct llog_ctxt *ctxt;
3306 int rc = 0, rc2 = 0;
3309 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3311 rc = llog_cleanup(ctxt);
3313 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3315 rc2 = llog_cleanup(ctxt);
3322 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3323 struct obd_uuid *cluuid,
3324 struct obd_connect_data *data)
3326 struct client_obd *cli = &obd->u.cli;
3328 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3331 client_obd_list_lock(&cli->cl_loi_list_lock);
3332 data->ocd_grant = cli->cl_avail_grant ?:
3333 2 * cli->cl_max_pages_per_rpc << PAGE_SHIFT;
3334 lost_grant = cli->cl_lost_grant;
3335 cli->cl_lost_grant = 0;
3336 client_obd_list_unlock(&cli->cl_loi_list_lock);
3338 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3339 "cl_lost_grant: %ld\n", data->ocd_grant,
3340 cli->cl_avail_grant, lost_grant);
3341 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3342 " ocd_grant: %d\n", data->ocd_connect_flags,
3343 data->ocd_version, data->ocd_grant);
3349 static int osc_disconnect(struct obd_export *exp)
3351 struct obd_device *obd = class_exp2obd(exp);
3352 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3355 if (obd->u.cli.cl_conn_count == 1)
3356 /* flush any remaining cancel messages out to the target */
3357 llog_sync(ctxt, exp);
3359 rc = client_disconnect_export(exp);
3363 static int osc_import_event(struct obd_device *obd,
3364 struct obd_import *imp,
3365 enum obd_import_event event)
3367 struct client_obd *cli;
3371 LASSERT(imp->imp_obd == obd);
3374 case IMP_EVENT_DISCON: {
3375 /* Only do this on the MDS OSC's */
3376 if (imp->imp_server_timeout) {
3377 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3379 spin_lock(&oscc->oscc_lock);
3380 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3381 spin_unlock(&oscc->oscc_lock);
3386 case IMP_EVENT_INACTIVE: {
3387 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3390 case IMP_EVENT_INVALIDATE: {
3391 struct ldlm_namespace *ns = obd->obd_namespace;
3395 client_obd_list_lock(&cli->cl_loi_list_lock);
3396 cli->cl_avail_grant = 0;
3397 cli->cl_lost_grant = 0;
3398 /* all pages go to failing rpcs due to the invalid import */
3399 osc_check_rpcs(cli);
3400 client_obd_list_unlock(&cli->cl_loi_list_lock);
3402 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3406 case IMP_EVENT_ACTIVE: {
3407 /* Only do this on the MDS OSC's */
3408 if (imp->imp_server_timeout) {
3409 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3411 spin_lock(&oscc->oscc_lock);
3412 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3413 spin_unlock(&oscc->oscc_lock);
3415 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3418 case IMP_EVENT_OCD: {
3419 struct obd_connect_data *ocd = &imp->imp_connect_data;
3421 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3422 osc_init_grant(&obd->u.cli, ocd);
3425 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3426 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3428 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3432 CERROR("Unknown import event %d\n", event);
3438 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3444 rc = ptlrpcd_addref();
3448 rc = client_obd_setup(obd, lcfg);
3452 struct lprocfs_static_vars lvars;
3453 struct client_obd *cli = &obd->u.cli;
3455 lprocfs_init_vars(osc, &lvars);
3456 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3457 lproc_osc_attach_seqstat(obd);
3458 ptlrpc_lprocfs_register_obd(obd);
3462 /* We need to allocate a few requests more, because
3463 brw_interpret_oap tries to create new requests before freeing
3464 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3465 reserved, but I afraid that might be too much wasted RAM
3466 in fact, so 2 is just my guess and still should work. */
3467 cli->cl_import->imp_rq_pool =
3468 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3470 ptlrpc_add_rqs_to_pool);
3476 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3482 case OBD_CLEANUP_EARLY: {
3483 struct obd_import *imp;
3484 imp = obd->u.cli.cl_import;
3485 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3486 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3487 ptlrpc_deactivate_import(imp);
3490 case OBD_CLEANUP_EXPORTS:
3492 case OBD_CLEANUP_SELF_EXP:
3493 rc = obd_llog_finish(obd, 0);
3495 CERROR("failed to cleanup llogging subsystems\n");
3497 case OBD_CLEANUP_OBD:
3503 int osc_cleanup(struct obd_device *obd)
3505 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3509 ptlrpc_lprocfs_unregister_obd(obd);
3510 lprocfs_obd_cleanup(obd);
3512 spin_lock(&oscc->oscc_lock);
3513 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3514 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3515 spin_unlock(&oscc->oscc_lock);
3517 /* free memory of osc quota cache */
3518 lquota_cleanup(quota_interface, obd);
3520 rc = client_obd_cleanup(obd);
3526 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3528 struct lustre_cfg *lcfg = buf;
3529 struct lprocfs_static_vars lvars;
3532 lprocfs_init_vars(osc, &lvars);
3534 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3538 struct obd_ops osc_obd_ops = {
3539 .o_owner = THIS_MODULE,
3540 .o_setup = osc_setup,
3541 .o_precleanup = osc_precleanup,
3542 .o_cleanup = osc_cleanup,
3543 .o_add_conn = client_import_add_conn,
3544 .o_del_conn = client_import_del_conn,
3545 .o_connect = client_connect_import,
3546 .o_reconnect = osc_reconnect,
3547 .o_disconnect = osc_disconnect,
3548 .o_statfs = osc_statfs,
3549 .o_statfs_async = osc_statfs_async,
3550 .o_packmd = osc_packmd,
3551 .o_unpackmd = osc_unpackmd,
3552 .o_create = osc_create,
3553 .o_destroy = osc_destroy,
3554 .o_getattr = osc_getattr,
3555 .o_getattr_async = osc_getattr_async,
3556 .o_setattr = osc_setattr,
3557 .o_setattr_async = osc_setattr_async,
3559 .o_brw_async = osc_brw_async,
3560 .o_prep_async_page = osc_prep_async_page,
3561 .o_queue_async_io = osc_queue_async_io,
3562 .o_set_async_flags = osc_set_async_flags,
3563 .o_queue_group_io = osc_queue_group_io,
3564 .o_trigger_group_io = osc_trigger_group_io,
3565 .o_teardown_async_page = osc_teardown_async_page,
3566 .o_punch = osc_punch,
3568 .o_enqueue = osc_enqueue,
3569 .o_match = osc_match,
3570 .o_change_cbdata = osc_change_cbdata,
3571 .o_cancel = osc_cancel,
3572 .o_cancel_unused = osc_cancel_unused,
3573 .o_join_lru = osc_join_lru,
3574 .o_iocontrol = osc_iocontrol,
3575 .o_get_info = osc_get_info,
3576 .o_set_info_async = osc_set_info_async,
3577 .o_import_event = osc_import_event,
3578 .o_llog_init = osc_llog_init,
3579 .o_llog_finish = osc_llog_finish,
3580 .o_process_config = osc_process_config,
3583 extern quota_interface_t osc_quota_interface;
3585 int __init osc_init(void)
3587 struct lprocfs_static_vars lvars;
3591 lprocfs_init_vars(osc, &lvars);
3593 request_module("lquota");
3594 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3595 lquota_init(quota_interface);
3596 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3598 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3599 LUSTRE_OSC_NAME, NULL);
3601 if (quota_interface)
3602 PORTAL_SYMBOL_PUT(osc_quota_interface);
3610 static void /*__exit*/ osc_exit(void)
3612 lquota_exit(quota_interface);
3613 if (quota_interface)
3614 PORTAL_SYMBOL_PUT(osc_quota_interface);
3616 class_unregister_type(LUSTRE_OSC_NAME);
3619 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3620 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3621 MODULE_LICENSE("GPL");
3623 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);